diff --git a/include/common/tmsg.h b/include/common/tmsg.h index af44184de6..90147a3c31 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2295,17 +2295,6 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); -typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - int64_t streamId; - char* sql; - char* executorMsg; -} SMVCreateStreamReq, SMSCreateStreamReq; - -typedef struct { - int64_t streamId; -} SMVCreateStreamRsp, SMSCreateStreamRsp; - enum { TOPIC_SUB_TYPE__DB = 1, TOPIC_SUB_TYPE__TABLE, @@ -2327,16 +2316,9 @@ int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTo int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq); void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq); -typedef struct { - int64_t topicId; -} SCMCreateTopicRsp; - -int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp); -int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp); - typedef struct { int64_t consumerId; -} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; +} SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { int64_t consumerId; @@ -2348,6 +2330,7 @@ typedef struct { int8_t autoCommit; int32_t autoCommitInterval; int8_t resetOffsetCfg; + int8_t enableReplay; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2367,6 +2350,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->autoCommit); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); + tlen += taosEncodeFixedI8(buf, pReq->enableReplay); return tlen; } @@ -2390,71 +2374,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI8(buf, &pReq->autoCommit); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); - return buf; -} - -typedef struct SMqSubTopic { - int32_t vgId; - int64_t topicId; - SEpSet epSet; -} SMqSubTopic; - -typedef struct { - int32_t topicNum; - SMqSubTopic topics[]; -} SCMSubscribeRsp; - -static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pRsp->topicNum); - for (int32_t i = 0; i < pRsp->topicNum; i++) { - tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); - tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); - tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); - } - return tlen; -} - -static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { - buf = taosDecodeFixedI32(buf, &pRsp->topicNum); - for (int32_t i = 0; i < pRsp->topicNum; i++) { - buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); - buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); - buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); - } - return buf; -} - -typedef struct { - int64_t topicId; - int64_t consumerId; - int64_t consumerGroupId; - int64_t offset; - char* sql; - char* logicalPlan; - char* physicalPlan; -} SMVSubscribeReq; - -static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->topicId); - tlen += taosEncodeFixedI64(buf, pReq->consumerId); - tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId); - tlen += taosEncodeFixedI64(buf, pReq->offset); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->logicalPlan); - tlen += taosEncodeString(buf, pReq->physicalPlan); - return tlen; -} - -static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->topicId); - buf = taosDecodeFixedI64(buf, &pReq->consumerId); - buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId); - buf = taosDecodeFixedI64(buf, &pReq->offset); - buf = taosDecodeString(buf, &pReq->sql); - buf = taosDecodeString(buf, &pReq->logicalPlan); - buf = taosDecodeString(buf, &pReq->physicalPlan); + buf = taosDecodeFixedI8(buf, &pReq->enableReplay); return buf; } @@ -3534,6 +3454,7 @@ typedef struct { int64_t consumerId; int64_t timeout; STqOffsetVal reqOffset; + int8_t enableReplay; } SMqPollReq; int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); @@ -3593,6 +3514,7 @@ typedef struct { SArray* blockData; SArray* blockTbName; SArray* blockSchema; + int64_t sleepTime; } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 0a240dd8f5..fa110d66e2 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -222,6 +222,7 @@ typedef struct SStoreTqReader { bool (*tqReaderNextBlockInWal)(); bool (*tqNextBlockImpl)(); // todo remove it SSDataBlock* (*tqGetResultBlock)(); + int64_t (*tqGetResultBlockTime)(); void (*tqReaderSetColIdList)(); int32_t (*tqReaderSetQueryTableList)(); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 39bf2b5681..eb991379c4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -792,6 +792,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) #define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011) #define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012) +#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) +#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 10f8b89f4d..91c21fe344 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -683,7 +683,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) { taosHashCleanup(hashTmp); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } } taosHashCleanup(hashTmp); diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index f9076112c4..5e656c71a7 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -256,7 +256,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) { } if (unlikely(index >= OTD_JSON_FIELDS_NUM)) { - uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1; + uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start); + return TSDB_CODE_TSC_INVALID_JSON; } char *sTmp = *start; @@ -367,7 +368,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) { if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL || element->measure == NULL || element->timestamp == NULL) { - uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM) return -1; + uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM); + return TSDB_CODE_TSC_INVALID_JSON; } return 0; } @@ -381,7 +383,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) { } if (unlikely(index >= OTD_JSON_FIELDS_NUM)) { - uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1; + uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start); + return TSDB_CODE_TSC_INVALID_JSON; } if ((*start)[1] == 'm') { @@ -448,7 +451,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) { } if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) { - uError("elements != %d", OTD_JSON_FIELDS_NUM) return -1; + uError("elements != %d", OTD_JSON_FIELDS_NUM); + return TSDB_CODE_TSC_INVALID_JSON; } return 0; } @@ -477,7 +481,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) { } if (*marks[i] == NULL) { uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } } return TSDB_CODE_SUCCESS; @@ -816,25 +820,25 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr int32_t size = cJSON_GetArraySize(root); if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } cJSON *value = cJSON_GetObjectItem(root, "value"); if (unlikely(!cJSON_IsNumber(value))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } cJSON *type = cJSON_GetObjectItem(root, "type"); if (unlikely(!cJSON_IsString(type))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } double timeDouble = value->valuedouble; if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) { smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } if (timeDouble == 0) { @@ -849,32 +853,28 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr size_t typeLen = strlen(type->valuestring); if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) { // seconds - int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS; if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) { return tsInt64 * smlFactorS[toPrecision]; } - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) { switch (type->valuestring[0]) { case 'm': case 'M': // milliseconds return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision); - break; case 'u': case 'U': // microseconds return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision); - break; case 'n': case 'N': return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision); - break; default: - return -1; + return TSDB_CODE_TSC_INVALID_JSON_TYPE; } } else { - return -1; + return TSDB_CODE_TSC_INVALID_JSON_TYPE; } } @@ -895,7 +895,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { double timeDouble = timestamp->valuedouble; if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) { smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } if (unlikely(timeDouble < 0)) { @@ -911,14 +911,14 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { if (unlikely(fromPrecision == -1)) { smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } int64_t tsInt64 = timeDouble; if (fromPrecision == TSDB_TIME_PRECISION_SECONDS) { if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) { return tsInt64 * smlFactorS[toPrecision]; } - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } else { return convertTimePrecision(timeDouble, fromPrecision, toPrecision); } @@ -926,7 +926,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { return smlParseTSFromJSONObj(info, timestamp, toPrecision); } else { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } } diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index a565fb1a21..006475654a 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -70,7 +70,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision); if (unlikely(ts == -1)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } return ts; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6ee5508048..50b8eb1eca 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -63,7 +63,7 @@ struct tmq_conf_t { int8_t resetOffset; int8_t withTbName; int8_t snapEnable; - int32_t snapBatchSize; + int8_t replayEnable; bool hbBgEnable; uint16_t port; int32_t autoCommitInterval; @@ -83,6 +83,7 @@ struct tmq_t { int8_t autoCommit; int32_t autoCommitInterval; int8_t resetOffsetCfg; + int8_t replayEnable; uint64_t consumerId; bool hbBgEnable; tmq_commit_cb* commitCb; @@ -92,19 +93,13 @@ struct tmq_t { SRWLatch lock; int8_t status; int32_t epoch; -#if 0 - int8_t epStatus; - int32_t epSkipCnt; -#endif // poll info int64_t pollCnt; int64_t totalRows; -// bool needReportOffsetRows; // timer tmr_h hbLiveTimer; tmr_h epTimer; - tmr_h reportTimer; tmr_h commitTimer; STscObj* pTscObj; // connection SArray* clientTopics; // SArray @@ -152,6 +147,8 @@ typedef struct { int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data + int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data + int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. SEpSet epSet; } SMqClientVg; @@ -360,24 +357,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { - conf->snapBatchSize = taosStr2int64(value); - return TMQ_CONF_OK; - } - -// if (strcasecmp(key, "enable.heartbeat.background") == 0) { - // if (strcasecmp(value, "true") == 0) { - // conf->hbBgEnable = true; - // return TMQ_CONF_OK; - // } else if (strcasecmp(value, "false") == 0) { - // conf->hbBgEnable = false; - // return TMQ_CONF_OK; - // } else { -// tscError("the default value of enable.heartbeat.background is true, can not be seted"); -// return TMQ_CONF_INVALID; - // } -// } - if (strcasecmp(key, "td.connect.ip") == 0) { conf->ip = taosStrdup(value); return TMQ_CONF_OK; @@ -398,6 +377,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } + if (strcasecmp(key, "enable.replay") == 0) { + if (strcasecmp(value, "true") == 0) { + conf->replayEnable = true; + return TMQ_CONF_OK; + } else if (strcasecmp(value, "false") == 0) { + conf->replayEnable = false; + return TMQ_CONF_OK; + } else { + return TMQ_CONF_INVALID; + } + } + if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; } @@ -1075,6 +1066,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->commitCb = conf->commitCb; pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; + pTmq->replayEnable = conf->replayEnable; + if(conf->replayEnable){ + pTmq->autoCommit = false; + } taosInitRWLatch(&pTmq->lock); pTmq->hbBgEnable = conf->hbBgEnable; @@ -1424,6 +1419,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, + .blockReceiveTs = 0, + .blockSleepForReplay = 0, .numOfRows = pInfo ? pInfo->numOfRows : 0, }; @@ -1695,6 +1692,12 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { continue; } + if (tmq->replayEnable && taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, + tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); + continue; + } + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); @@ -1816,6 +1819,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; pVg->emptyBlockReceiveTs = 0; + if(tmq->replayEnable){ + pVg->blockReceiveTs = taosGetTimestampMs(); + pVg->blockSleepForReplay = pRsp->rsp.sleepTime; + } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index fd39ae98d9..7d285434d9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4352,31 +4352,6 @@ void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) { } } -int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1; - tEndEncode(&encoder); - - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1; - tEndDecode(&decoder); - - tDecoderClear(&decoder); - return 0; -} - int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5983,6 +5958,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; + if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; tEndEncode(&encoder); @@ -6019,6 +5995,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -7795,6 +7775,7 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { } } } + if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1; return 0; } @@ -7840,6 +7821,8 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } + if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1; + return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 844e69e659..1715f56091 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -490,32 +490,6 @@ typedef struct { char filterTb[TSDB_TABLE_NAME_LEN]; } SShowObj; -typedef struct { - int64_t id; - int8_t type; - int8_t replica; - int16_t numOfColumns; - int32_t rowSize; - int32_t numOfRows; - int32_t numOfReads; - int32_t payloadLen; - void* pIter; - SMnode* pMnode; - char db[TSDB_DB_FNAME_LEN]; - int16_t offset[TSDB_MAX_COLUMNS]; - int32_t bytes[TSDB_MAX_COLUMNS]; - char payload[]; -} SSysTableRetrieveObj; - -typedef struct { - char key[TSDB_PARTITION_KEY_LEN]; - int64_t dbUid; - int64_t offset; -} SMqOffsetObj; - -int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset); -void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset); - typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7273e13317..05156e1427 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -18,6 +18,7 @@ #include "mndPrivilege.h" #include "mndVgroup.h" #include "mndShow.h" +#include "mndDb.h" #include "mndSubscribe.h" #include "mndTopic.h" #include "mndTrans.h" @@ -124,7 +125,7 @@ void mndRebCntDec() { } } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { @@ -139,6 +140,22 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * return -1; } + if(enableReplay){ + if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){ + return TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; + }else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) { + SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db); + if (pDb == NULL) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + if (pDb->cfg.numOfVgroups != 1) { + return TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP; + } + mndReleaseDb(pMnode, pDb); + } + } + mndTransSetDbName(pTrans, pOneTopic, NULL); if(mndTransCheckConflict(pMnode, pTrans) != 0){ mndReleaseTopic(pMnode, pTopic); @@ -177,7 +194,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { if (pTrans == NULL) { goto FAIL; } - if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){ + if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0){ goto FAIL; } @@ -697,7 +714,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user); + code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 62b5cb00e6..481495cbe5 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -330,24 +330,6 @@ void dumpSubscribe(SSdb *pSdb, SJson *json) { } } -void dumpOffset(SSdb *pSdb, SJson *json) { - void *pIter = NULL; - SJson *items = tjsonAddArrayToObject(json, "offsets"); - - while (1) { - SMqOffsetObj *pObj = NULL; - pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pObj); - if (pIter == NULL) break; - - SJson *item = tjsonCreateObject(); - tjsonAddItemToArray(items, item); - tjsonAddStringToObject(item, "key", pObj->key); - tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid)); - tjsonAddStringToObject(item, "offset", i642str(pObj->offset)); - sdbRelease(pSdb, pObj); - } -} - void dumpStream(SSdb *pSdb, SJson *json) { void *pIter = NULL; SJson *items = tjsonAddArrayToObject(json, "streams"); @@ -608,7 +590,7 @@ void mndDumpSdb() { dumpTopic(pSdb, json); dumpConsumer(pSdb, json); dumpSubscribe(pSdb, json); - dumpOffset(pSdb, json); +// dumpOffset(pSdb, json); dumpStream(pSdb, json); dumpAcct(pSdb, json); dumpAuth(pSdb, json); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 94fd6027c0..4b5cfc830f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -298,11 +298,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); - /*taosWLockLatch(&pOldTopic->lock);*/ - - // TODO handle update - - /*taosWUnLockLatch(&pOldTopic->lock);*/ return 0; } @@ -320,23 +315,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { sdbRelease(pSdb, pTopic); } -static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { - int32_t contLen = sizeof(SDDropTopicReq); - - SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen); - if (pDrop == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pDrop->head.contLen = htonl(contLen); - pDrop->head.vgId = htonl(pVgroup->vgId); - memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN); - pDrop->tuid = htobe64(pTopic->uid); - - return pDrop; -} - static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { terrno = TSDB_CODE_MND_INVALID_TOPIC; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e15f5f911d..ee0f0e2eeb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -213,7 +213,7 @@ typedef struct STqReader { SPackedData msg; SSubmitReq2 submit; int32_t nextBlk; - int64_t lastBlkUid; + int64_t lastTs; SWalReader *pWalReader; SMeta *pVnodeMeta; SHashObj *tbIdHash; @@ -241,6 +241,7 @@ bool tqNextBlockInWal(STqReader *pReader, const char *idstr); bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader *tqGetWalReader(STqReader *pReader); SSDataBlock *tqGetResultBlock(STqReader *pReader); +int64_t tqGetResultBlockTime(STqReader *pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ee96e602d8..5d4ea8699b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -47,6 +47,7 @@ typedef struct STqOffsetStore STqOffsetStore; // tqPush #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) #define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) +#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec typedef struct { @@ -91,6 +92,10 @@ typedef struct { STqExecHandle execHandle; // exec SRpcMsg* msg; tq_handle_status status; + + // for replay + SSDataBlock* block; + int64_t blockTime; } STqHandle; struct STQ { @@ -108,17 +113,13 @@ struct STQ { SStreamMeta* pStreamMeta; }; -typedef struct { - int32_t size; -} STqOffsetHead; - int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); -int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); +int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index adf3abe4d9..2fda70bcc1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -82,6 +82,9 @@ void tqDestroyTqHandle(void* data) { taosMemoryFree(pData->msg); pData->msg = NULL; } + if (pData->block != NULL){ + blockDataDestroy(pData->block); + } } static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 11bb737225..5e67f3c3ac 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -37,11 +37,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { int32_t vgId = TD_VID(pStore->pTq->pVnode); int64_t code = 0; - - STqOffsetHead head = {0}; - + int32_t size = 0; while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { if (code == 0) { break; } else { @@ -49,7 +47,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { } } - int32_t size = htonl(head.size); void* pMemBuf = taosMemoryCalloc(1, size); if (pMemBuf == NULL) { tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size); @@ -175,11 +172,11 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { return -1; } - int32_t totLen = sizeof(STqOffsetHead) + bodyLen; + int32_t totLen = INT_BYTES + bodyLen; void* buf = taosMemoryCalloc(1, totLen); - void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead)); + void* abuf = POINTER_SHIFT(buf, INT_BYTES); - ((STqOffsetHead*)buf)->size = htonl(bodyLen); + *(int32_t*)buf = bodyLen; SEncoder encoder; tEncoderInit(&encoder, abuf, bodyLen); tEncodeSTqOffset(&encoder, pOffset); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 4d470ee5b6..65914cc70e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -475,6 +475,10 @@ SSDataBlock* tqGetResultBlock (STqReader* pReader) { return pReader->pResBlock; } +int64_t tqGetResultBlockTime(STqReader *pReader){ + return pReader->lastTs; +} + bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { if (pReader->msg.msgStr == NULL) { return false; @@ -641,7 +645,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; - pReader->lastBlkUid = uid; + pReader->lastTs = pSubmitTbData->ctimeMs; pBlock->info.id.uid = uid; pBlock->info.version = pReader->msg.ver; @@ -783,9 +787,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas } int32_t sversion = pSubmitTbData->sver; - int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; - pReader->lastBlkUid = uid; tDeleteSchemaWrapper(pReader->pSchemaWrapper); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index cbe3ffee9e..705fb86fab 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -64,7 +64,23 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in return 0; } -int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { +int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res){ + uint64_t ts = 0; + qStreamSetOpen(task); + + tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); + int32_t code = qExecTask(task, res, &ts); + if (code != TSDB_CODE_SUCCESS) { + tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code)); + terrno = code; + return -1; + } + + tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res); + return 0; +} + +int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { const int32_t MAX_ROWS_TO_RETURN = 4096; int32_t vgId = TD_VID(pTq->pVnode); @@ -80,34 +96,63 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs while (1) { SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - qStreamSetOpen(task); - - tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); - code = qExecTask(task, &pDataBlock, &ts); - if (code != TSDB_CODE_SUCCESS) { - tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code)); - terrno = code; - return -1; - } - - tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, - pDataBlock); - // current scan should be stopped asap, since the rebalance occurs. - if (pDataBlock == NULL) { - break; - } - - code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d, failed to add block to rsp msg", vgId); + code = getDataBlock(task, pHandle, vgId, &pDataBlock); + if (code != 0){ return code; } - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - if (totalRows >= MAX_ROWS_TO_RETURN) { + if(pRequest->enableReplay){ + if(IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL){ + blockDataDestroy(pHandle->block); + pHandle->block = NULL; + } + if(pHandle->block == NULL){ + if (pDataBlock == NULL) { + break; + } + STqOffsetVal offset = {0}; + qStreamExtractOffset(task, &offset); + pHandle->block = createDataBlock(); + copyDataBlock(pHandle->block, pDataBlock); + pHandle->blockTime = offset.ts; + code = getDataBlock(task, pHandle, vgId, &pDataBlock); + if (code != 0){ + return code; + } + } + + code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, failed to add block to rsp msg", vgId); + return code; + } + + pRsp->blockNum++; + if (pDataBlock == NULL) { + break; + } + copyDataBlock(pHandle->block, pDataBlock); + + STqOffsetVal offset = {0}; + qStreamExtractOffset(task, &offset); + pRsp->sleepTime = offset.ts - pHandle->blockTime; + pHandle->blockTime = offset.ts; break; + }else{ + if (pDataBlock == NULL) { + break; + } + code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, failed to add block to rsp msg", vgId); + return code; + } + + pRsp->blockNum++; + totalRows += pDataBlock->info.rows; + if (totalRows >= MAX_ROWS_TO_RETURN) { + break; + } } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 04695c1f63..a4c3d395e3 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -15,8 +15,6 @@ #include "tq.h" -#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) - static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); @@ -142,7 +140,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInitDataRsp(&dataRsp, *pOffset); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + int code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest); if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index c72ecd4824..a6a2e128d8 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -131,6 +131,7 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqGetResultBlock = tqGetResultBlock; pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; + pTq->tqGetResultBlockTime = tqGetResultBlockTime; } void initStateStoreAPI(SStateStore* pStore) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 474128007a..913e246f63 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1862,6 +1862,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { // curVersion move to next tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); + // use ts to pass time when replay, because ts not used if type is log + pTaskInfo->streamInfo.currentOffset.ts = pAPI->tqReaderFn.tqGetResultBlockTime(pInfo->tqReader); + if (hasResult) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9832720994..dd51b99cd1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -654,6 +654,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/utils/test/c/tmqOffset.c b/utils/test/c/tmqOffset.c index 7225cb87bd..9699e71f24 100644 --- a/utils/test/c/tmqOffset.c +++ b/utils/test/c/tmqOffset.c @@ -7,18 +7,14 @@ #include "tlog.h" #include "tmsg.h" -typedef struct { - int32_t size; -} STqOffsetHead; - int32_t tqOffsetRestoreFromFile(const char* fname) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile != NULL) { - STqOffsetHead head = {0}; int32_t code; while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + int32_t size = 0; + if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { if (code == 0) { break; } else { @@ -26,7 +22,6 @@ int32_t tqOffsetRestoreFromFile(const char* fname) { return -1; } } - int32_t size = htonl(head.size); void* memBuf = taosMemoryCalloc(1, size); if (memBuf == NULL) { printf("memBuf == NULL\n");