diff --git a/example/src/tmq.c b/example/src/tmq.c index 26e5ea82c1..094fd94bfc 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -66,7 +66,7 @@ int32_t init_env() { } int32_t create_topic() { - printf("create topic"); + printf("create topic\n"); TAOS_RES* pRes; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -91,6 +91,10 @@ int32_t create_topic() { return 0; } +void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { + printf("commit %d\n", resp); +} + tmq_t* build_consumer() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -103,6 +107,7 @@ tmq_t* build_consumer() { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); return tmq; } @@ -144,7 +149,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - static const int MIN_COMMIT_COUNT = 1000; + static const int MIN_COMMIT_COUNT = 1; int msg_count = 0; tmq_resp_err_t err; @@ -214,6 +219,6 @@ int main(int argc, char* argv[]) { tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); /*perf_loop(tmq, topic_list);*/ - basic_consume_loop(tmq, topic_list); - /*sync_consume_loop(tmq, topic_list);*/ + /*basic_consume_loop(tmq, topic_list);*/ + sync_consume_loop(tmq, topic_list); } diff --git a/include/client/taos.h b/include/client/taos.h index a960d37937..2c8135c8ff 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -198,8 +198,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi /* --------------------------TMQ INTERFACE------------------------------- */ enum tmq_resp_err_t { + TMQ_RESP_ERR__FAIL = -1, TMQ_RESP_ERR__SUCCESS = 0, - TMQ_RESP_ERR__FAIL = 1, }; typedef enum tmq_resp_err_t tmq_resp_err_t; @@ -226,7 +226,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); #if 0 DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); -DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics); #endif DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); @@ -238,6 +238,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t * #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); #endif +DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { diff --git a/include/common/common.h b/include/common/common.h index fd5b6717ab..8fa2d03d6d 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -16,7 +16,6 @@ #ifndef TDENGINE_COMMON_H #define TDENGINE_COMMON_H - #ifdef __cplusplus extern "C" { #endif @@ -43,14 +42,16 @@ extern "C" { // int16_t bytes; // } SSchema; -#define TMQ_REQ_TYPE_COMMIT_ONLY 0 -#define TMQ_REQ_TYPE_CONSUME_ONLY 1 -#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2 +enum { + TMQ_CONF__RESET_OFFSET__LATEST = -1, + TMQ_CONF__RESET_OFFSET__EARLIEAST = -2, + TMQ_CONF__RESET_OFFSET__NONE = -3, +}; typedef struct { uint32_t numOfTables; - SArray *pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid + SArray* pGroupList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; typedef struct SColumnDataAgg { @@ -79,14 +80,14 @@ typedef struct SConstantItem { // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { - SColumnDataAgg *pBlockAgg; - SArray *pDataBlock; // SArray - SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. - SDataBlockInfo info; + SColumnDataAgg* pBlockAgg; + SArray* pDataBlock; // SArray + SArray* pConstantList; // SArray, it is a constant/tags value of the corresponding result value. + SDataBlockInfo info; } SSDataBlock; typedef struct SVarColAttr { - int32_t *offset; // start position for each entry in the list + int32_t* offset; // start position for each entry in the list uint32_t length; // used buffer size that contain the valid data uint32_t allocLen; // allocated buffer size } SVarColAttr; @@ -94,11 +95,11 @@ typedef struct SVarColAttr { // pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - SColumnInfo info; // TODO filter info needs to be removed - bool hasNull;// if current column data has null value. - char *pData; // the corresponding block data in memory + SColumnInfo info; // TODO filter info needs to be removed + bool hasNull; // if current column data has null value. + char* pData; // the corresponding block data in memory union { - char *nullbitmap; // bitmap, one bit for each item in the list + char* nullbitmap; // bitmap, one bit for each item in the list SVarColAttr varmeta; }; } SColumnInfoData; @@ -149,7 +150,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp int32_t tlen = 0; int32_t sz = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeFixedI64(buf, pRsp->committedOffset); tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); @@ -170,7 +170,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { int32_t sz; buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeFixedI64(buf, &pRsp->committedOffset); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); @@ -250,11 +249,11 @@ typedef struct SSqlExpr { char token[TSDB_COL_NAME_LEN]; // original token SSchema resSchema; - int32_t numOfCols; - SColumn* pColumns; // data columns that are required by query - int32_t interBytes; // inter result buffer size - int16_t numOfParams; // argument value of each function - SVariant param[3]; // parameters are not more than 3 + int32_t numOfCols; + SColumn* pColumns; // data columns that are required by query + int32_t interBytes; // inter result buffer size + int16_t numOfParams; // argument value of each function + SVariant param[3]; // parameters are not more than 3 } SSqlExpr; typedef struct SExprInfo { @@ -271,7 +270,7 @@ typedef struct SSessionWindow { SColumn col; } SSessionWindow; -#define QUERY_ASC_FORWARD_STEP 1 +#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_DESC_FORWARD_STEP -1 #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5164fb6ccd..ae3586e735 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -422,8 +422,8 @@ typedef struct { } SColumnInfo; typedef struct { - uint64_t uid; - TSKEY key; // last accessed ts, for subscription + int64_t uid; + TSKEY key; // last accessed ts, for subscription } STableIdInfo; typedef struct STimeWindow { @@ -554,8 +554,8 @@ int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); typedef struct { - char db[TSDB_DB_FNAME_LEN]; - uint64_t uid; + char db[TSDB_DB_FNAME_LEN]; + int64_t uid; } SDropDbRsp; int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); @@ -570,12 +570,12 @@ int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); typedef struct { - char db[TSDB_DB_FNAME_LEN]; - uint64_t uid; - int32_t vgVersion; - int32_t vgNum; - int8_t hashMethod; - SArray* pVgroupInfos; // Array of SVgroupInfo + char db[TSDB_DB_FNAME_LEN]; + int64_t uid; + int32_t vgVersion; + int32_t vgNum; + int8_t hashMethod; + SArray* pVgroupInfos; // Array of SVgroupInfo } SUseDbRsp; int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); @@ -725,7 +725,7 @@ typedef struct { int32_t vgId; int32_t dnodeId; char db[TSDB_DB_FNAME_LEN]; - uint64_t dbUid; + int64_t dbUid; int32_t vgVersion; int32_t cacheBlockSize; int32_t totalBlocks; @@ -753,10 +753,10 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); typedef struct { - int32_t vgId; - int32_t dnodeId; - uint64_t dbUid; - char db[TSDB_DB_FNAME_LEN]; + int32_t vgId; + int32_t dnodeId; + int64_t dbUid; + char db[TSDB_DB_FNAME_LEN]; } SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); @@ -796,7 +796,7 @@ typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; - uint64_t dbId; + int64_t dbId; int32_t numOfTags; int32_t numOfColumns; int8_t precision; @@ -1268,7 +1268,7 @@ typedef struct { typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; - uint64_t tuid; + int64_t tuid; int32_t sverson; int32_t execLen; char* executor; @@ -1279,11 +1279,11 @@ typedef struct { typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; - uint64_t tuid; + int64_t tuid; } SDDropTopicReq; typedef struct SVCreateTbReq { - uint64_t ver; // use a general definition + int64_t ver; // use a general definition char* name; uint32_t ttl; uint32_t keep; @@ -1314,8 +1314,8 @@ int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); typedef struct { - uint64_t ver; // use a general definition - SArray* pArray; + int64_t ver; // use a general definition + SArray* pArray; } SVCreateTbBatchReq; typedef struct { @@ -1325,7 +1325,7 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); typedef struct { - uint64_t ver; + int64_t ver; char* name; uint8_t type; tb_uid_t suid; @@ -1760,35 +1760,19 @@ typedef struct { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqOffset; -typedef struct { - int32_t vgId; - SArray* offsets; // SArray -} SMqVgOffsets; - typedef struct { int32_t num; SMqOffset* offsets; -} SMqCMResetOffsetReq; +} SMqCMCommitOffsetReq; typedef struct { int32_t reserved; -} SMqCMResetOffsetRsp; - -typedef struct { - int64_t leftForVer; - SMqVgOffsets offsets; -} SMqMVResetOffsetReq; - -typedef struct { - int32_t reserved; -} SMqMVResetOffsetRsp; +} SMqCMCommitOffsetRsp; int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); -int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq); -int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq); -int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq); -int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq); +int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq); +int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq); typedef struct { uint32_t nCols; @@ -1870,7 +1854,6 @@ typedef struct { typedef struct { int64_t consumerId; SSchemaWrapper* schemas; - int64_t committedOffset; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; @@ -1881,22 +1864,18 @@ typedef struct { // one req for one vg+topic typedef struct { SMsgHead head; - // 0: commit only, current offset - // 1: consume only, poll next offset - // 2: commit current and consume next offset - int32_t reqType; - int64_t reqId; int64_t consumerId; int64_t blockingTime; char cgroup[TSDB_CONSUMER_GROUP_LEN]; - int64_t offset; + int64_t currentOffset; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; typedef struct { int32_t vgId; + int64_t offset; SEpSet epSet; } SMqSubVgEp; @@ -1917,12 +1896,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taos static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); + tlen += taosEncodeFixedI64(buf, pVgEp->offset); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeFixedI64(buf, &pVgEp->offset); buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return buf; } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1a63ea73a5..18718b0c52 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -143,10 +143,10 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) @@ -194,4 +194,3 @@ enum { TDMT_MAX #endif }; -// clang-format on diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 1e967a6d2b..bf48a8523c 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -113,14 +113,15 @@ typedef enum { SDB_USER = 7, SDB_AUTH = 8, SDB_ACCT = 9, - SDB_SUBSCRIBE = 10, - SDB_CONSUMER = 11, - SDB_TOPIC = 12, - SDB_VGROUP = 13, - SDB_STB = 14, - SDB_DB = 15, - SDB_FUNC = 16, - SDB_MAX = 17 + SDB_OFFSET = 10, + SDB_SUBSCRIBE = 11, + SDB_CONSUMER = 12, + SDB_TOPIC = 13, + SDB_VGROUP = 14, + SDB_STB = 15, + SDB_DB = 16, + SDB_FUNC = 17, + SDB_MAX = 18 } ESdbType; typedef struct SSdb SSdb; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 92028b0837..b947142958 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -20,6 +20,8 @@ extern "C" { #endif +// clang-format off + #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) @@ -260,6 +262,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7) #define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8) #define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) +#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA) #define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0) // dnode diff --git a/include/util/tdef.h b/include/util/tdef.h index d7e26e97d8..d0f2b77f1f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #ifndef _TD_UTIL_DEF_H #define _TD_UTIL_DEF_H @@ -196,6 +198,7 @@ typedef enum ELogicConditionType { #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_CONSUMER_GROUP_LEN 192 #define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) +#define TSDB_PARTITION_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE @@ -349,9 +352,6 @@ typedef enum ELogicConditionType { #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode -#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type -#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode - #define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 229d3a9ec3..9a1025c4bd 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -45,22 +45,24 @@ struct tmq_topic_vgroup_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[256]; - bool auto_commit; + char clientId[256]; + char groupId[256]; + int8_t auto_commit; + int8_t resetOffset; + tmq_commit_cb* commit_cb; /*char* ip;*/ /*uint16_t port;*/ - tmq_commit_cb* commit_cb; }; struct tmq_t { // conf char groupId[256]; char clientId[256]; - bool autoCommit; + int8_t autoCommit; SRWLatch lock; int64_t consumerId; int32_t epoch; + int32_t resetOffsetCfg; int64_t status; tsem_t rspSem; STscObj* pTscObj; @@ -79,7 +81,6 @@ typedef struct { // statistics int64_t pollCnt; // offset - int64_t committedOffset; int64_t currentOffset; // connection info int32_t vgId; @@ -115,21 +116,17 @@ typedef struct { } SMqConsumeCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - int32_t async; - tsem_t rspSem; -} SMqCommitCbParam; - -typedef struct { - tmq_t* tmq; + tmq_t* tmq; + /*SMqClientVg* pVg;*/ + int32_t async; tsem_t rspSem; tmq_resp_err_t rspErr; -} SMqResetOffsetParam; +} SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; + conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; return conf; } @@ -157,6 +154,20 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_INVALID; } } + if (strcmp(key, "auto.offset.reset") == 0) { + if (strcmp(value, "none") == 0) { + conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE; + return TMQ_CONF_OK; + } else if (strcmp(value, "earliest") == 0) { + conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; + return TMQ_CONF_OK; + } else if (strcmp(value, "latest") == 0) { + conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; + return TMQ_CONF_OK; + } else { + return TMQ_CONF_INVALID; + } + } return TMQ_CONF_UNKNOWN; } @@ -190,14 +201,12 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { if (pParam->tmq->commit_cb) { pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); } - if (!pParam->async) tsem_post(&pParam->rspSem); - return 0; -} - -int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param; - pParam->rspErr = code; - tsem_post(&pParam->rspSem); + if (!pParam->async) + tsem_post(&pParam->rspSem); + else { + tsem_destroy(&pParam->rspSem); + free(param); + } return 0; } @@ -216,6 +225,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs strcpy(pTmq->groupId, conf->groupId); pTmq->autoCommit = conf->auto_commit; pTmq->commit_cb = conf->commit_cb; + pTmq->resetOffsetCfg = conf->resetOffset; tsem_init(&pTmq->rspSem, 0, 0); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); @@ -223,18 +233,40 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } -tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { - SRequestObj* pRequest = NULL; +tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { + // TODO: add read write lock + SRequestObj* pRequest = NULL; + tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS; // build msg // send to mnode - SMqCMResetOffsetReq req; - req.num = offsets->cnt; - req.offsets = (SMqOffset*)offsets->elems; + SMqCMCommitOffsetReq req; + SArray* pArray = NULL; + + if (offsets == NULL) { + pArray = taosArrayInit(0, sizeof(SMqOffset)); + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + SMqOffset offset; + strcpy(offset.topicName, pTopic->topicName); + strcpy(offset.cgroup, tmq->groupId); + offset.vgId = pVg->vgId; + offset.offset = pVg->currentOffset; + taosArrayPush(pArray, &offset); + } + } + req.num = pArray->size; + req.offsets = pArray->pData; + } else { + req.num = offsets->cnt; + req.offsets = (SMqOffset*)offsets->elems; + } SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - tEncodeSMqCMResetOffsetReq(&encoder, &req); + tEncodeSMqCMCommitOffsetReq(&encoder, &req); int32_t tlen = encoder.pos; void* buf = malloc(tlen); if (buf == NULL) { @@ -244,32 +276,41 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse tCoderClear(&encoder); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); - tEncodeSMqCMResetOffsetReq(&encoder, &req); + tEncodeSMqCMCommitOffsetReq(&encoder, &req); tCoderClear(&encoder); - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET); + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET); if (pRequest == NULL) { tscError("failed to malloc request"); } - SMqResetOffsetParam param = {0}; - tsem_init(¶m.rspSem, 0, 0); - param.tmq = tmq; + SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam)); + if (pParam == NULL) { + return -1; + } + pParam->tmq = tmq; + tsem_init(&pParam->rspSem, 0, 0); pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->param = ¶m; - sendInfo->fp = tmqResetOffsetCb; + sendInfo->param = pParam; + sendInfo->fp = tmqCommitCb; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(¶m.rspSem); - tsem_destroy(¶m.rspSem); + if (!async) { + tsem_wait(&pParam->rspSem); + resp = pParam->rspErr; + } - return param.rspErr; + if (pArray) { + taosArrayDestroy(pArray); + } + + return resp; } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { @@ -641,8 +682,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { // clang-format off SMqClientVg clientVg = { .pollCnt = 0, - .committedOffset = -1, - .currentOffset = -1, + .currentOffset = pVgEp->offset, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet }; @@ -708,23 +748,51 @@ END: return 0; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, - SMqClientVg* pVg) { +tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { + const SMqOffset* pOffset = &offset->offset; + if (strcmp(pOffset->cgroup, tmq->groupId) != 0) { + return TMQ_RESP_ERR__FAIL; + } + int32_t sz = taosArrayGetSize(tmq->clientTopics); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i); + if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) { + int32_t vgSz = taosArrayGetSize(clientTopic->vgs); + for (int32_t j = 0; j < vgSz; j++) { + SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j); + if (pVg->vgId == pOffset->vgId) { + pVg->currentOffset = pOffset->offset; + return TMQ_RESP_ERR__SUCCESS; + } + } + } + } + return TMQ_RESP_ERR__FAIL; +} + +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) { + int64_t reqOffset; + if (pVg->currentOffset >= 0) { + reqOffset = pVg->currentOffset; + } else { + if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) { + tscError("unable to poll since no committed offset but reset offset is set to none"); + return NULL; + } + reqOffset = tmq->resetOffsetCfg; + } + SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); if (pReq == NULL) { return NULL; } - pReq->reqType = type; + strcpy(pReq->topic, pTopic->topicName); - pReq->blockingTime = blocking_time; - pReq->consumerId = tmq->consumerId; strcpy(pReq->cgroup, tmq->groupId); - if (type == TMQ_REQ_TYPE_COMMIT_ONLY) { - pReq->offset = pVg->currentOffset; - } else { - pReq->offset = pVg->currentOffset + 1; - } + pReq->blockingTime = blocking_time; + pReq->consumerId = tmq->consumerId; + pReq->currentOffset = reqOffset; pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); @@ -743,13 +811,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); - printf("over1\n"); + /*printf("over1\n");*/ usleep(blocking_time * 1000); return NULL; } SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(pTopic->vgs) == 0) { - printf("over2\n"); + /*printf("over2\n");*/ usleep(blocking_time * 1000); return NULL; } @@ -760,8 +828,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ - int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY; - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg); + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg); if (pReq == NULL) { ASSERT(false); usleep(blocking_time * 1000); @@ -821,6 +888,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } +#if 0 tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { if (tmq_topic_vgroup_list != NULL) { // TODO @@ -831,7 +899,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg); + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; @@ -858,6 +926,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v return 0; } +#endif void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 46feab7791..8101c18ebf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -285,7 +285,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->keep); @@ -330,7 +330,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { } void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { - buf = taosDecodeFixedU64(buf, &(pReq->ver)); + buf = taosDecodeFixedI64(buf, &(pReq->ver)); buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->keep)); @@ -380,7 +380,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray)); for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i); @@ -393,7 +393,7 @@ int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { uint32_t nsize = 0; - buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedI64(buf, &pReq->ver); buf = taosDecodeFixedU32(buf, &nsize); pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq)); for (size_t i = 0; i < nsize; i++) { @@ -407,14 +407,14 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU8(buf, pReq->type); return tlen; } void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { - buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedI64(buf, &pReq->ver); buf = taosDecodeString(buf, &pReq->name); buf = taosDecodeFixedU8(buf, &pReq->type); return buf; @@ -1393,7 +1393,7 @@ int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->db) < 0) return -1; - if (tEncodeU64(&encoder, pRsp->uid) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->uid) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1407,7 +1407,7 @@ int32_t tDeserializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->db) < 0) return -1; - if (tDecodeU64(&decoder, &pRsp->uid) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->uid) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); @@ -1468,7 +1468,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1; @@ -1518,7 +1518,7 @@ int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->uid) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1; @@ -1661,7 +1661,7 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->tbName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->stbName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->dbId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->numOfTags) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->numOfColumns) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1; @@ -1684,7 +1684,7 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) { if (tDecodeCStrTo(pDecoder, pRsp->tbName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->stbName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->dbId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->numOfTags) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->numOfColumns) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1; @@ -2093,7 +2093,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; - if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheBlockSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1; @@ -2133,7 +2133,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; - if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheBlockSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1; @@ -2171,7 +2171,7 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; - if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; tEndEncode(&encoder); @@ -2187,7 +2187,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; - if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; tEndDecode(&decoder); @@ -2356,35 +2356,7 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) { return 0; } -int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) { - if (tStartEncode(encoder) < 0) return -1; - if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1; - int32_t sz = taosArrayGetSize(pOffsets->offsets); - if (tEncodeI32(encoder, sz) < 0) return -1; - for (int32_t i = 0; i < sz; i++) { - SMqOffset *offset = taosArrayGet(pOffsets->offsets, i); - if (tEncodeSMqOffset(encoder, offset) < 0) return -1; - } - tEndEncode(encoder); - return encoder->pos; -} - -int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) { - int32_t sz; - if (tStartDecode(decoder) < 0) return -1; - if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1; - if (tDecodeI32(decoder, &sz) < 0) return -1; - pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset)); - for (int32_t i = 0; i < sz; i++) { - SMqOffset offset; - if (tDecodeSMqOffset(decoder, &offset) < 0) return -1; - taosArrayPush(pOffsets->offsets, &offset); - } - tEndDecode(decoder); - return 0; -} - -int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) { +int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq *pReq) { if (tStartEncode(encoder) < 0) return -1; if (tEncodeI32(encoder, pReq->num) < 0) return -1; for (int32_t i = 0; i < pReq->num; i++) { @@ -2394,7 +2366,7 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p return encoder->pos; } -int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { +int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { if (tStartDecode(decoder) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1; pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); @@ -2405,23 +2377,3 @@ int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { tEndDecode(decoder); return 0; } - -#if 0 -int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) { - if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tEncodeSMqOffset(encoder, &pReq->offsets[i]); - } - return encoder->pos; -} - -int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) { - if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); - if (pReq->offsets == NULL) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tDecodeSMqOffset(decoder, &pReq->offsets[i]); - } - return 0; -} -#endif diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 4e35baf905..0aae145d2f 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -25,8 +25,8 @@ #include "dndMnode.h" #include "dndVnodes.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dndInitMsgFp(STransMgmt *pMgmt) { @@ -113,6 +113,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; @@ -155,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode * pDnode = parent; + SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -219,7 +220,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode * pDnode = param; + SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -313,7 +314,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e64191f715..bb2367ef72 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -123,6 +123,7 @@ typedef enum { TRN_TYPE_DROP_TOPIC = 1015, TRN_TYPE_SUBSCRIBE = 1016, TRN_TYPE_REBALANCE = 1017, + TRN_TYPE_COMMIT_OFFSET = 1018, TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_CREATE_DNODE = 2001, @@ -176,7 +177,7 @@ typedef struct { SArray* undoActions; int64_t createdTime; int64_t lastExecTime; - uint64_t dbUid; + int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; } STrans; @@ -304,16 +305,16 @@ typedef struct { } SDbCfg; typedef struct { - char name[TSDB_DB_FNAME_LEN]; - char acct[TSDB_USER_LEN]; - char createUser[TSDB_USER_LEN]; - int64_t createdTime; - int64_t updateTime; - uint64_t uid; - int32_t cfgVersion; - int32_t vgVersion; - int8_t hashMethod; // default is 1 - SDbCfg cfg; + char name[TSDB_DB_FNAME_LEN]; + char acct[TSDB_USER_LEN]; + char createUser[TSDB_USER_LEN]; + int64_t createdTime; + int64_t updateTime; + int64_t uid; + int32_t cfgVersion; + int32_t vgVersion; + int8_t hashMethod; // default is 1 + SDbCfg cfg; } SDbObj; typedef struct { @@ -346,8 +347,8 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t createdTime; int64_t updateTime; - uint64_t uid; - uint64_t dbUid; + int64_t uid; + int64_t dbUid; int32_t version; int32_t nextColId; int32_t numOfColumns; @@ -465,6 +466,24 @@ static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) { } } +typedef struct { + char key[TSDB_PARTITION_KEY_LEN]; + int64_t offset; +} SMqOffsetObj; + +static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pOffset->key); + tlen += taosEncodeFixedI64(buf, pOffset->offset); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) { + buf = taosDecodeStringTo(buf, pOffset->key); + buf = taosDecodeFixedI64(buf, &pOffset->offset); + return buf; +} + typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; int32_t status; diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h new file mode 100644 index 0000000000..e18eec973f --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_OFFSET_H_ +#define _TD_MND_OFFSET_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitOffset(SMnode *pMnode); +void mndCleanupOffset(SMnode *pMnode); + +SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key); +void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset); + +SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset); +SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw); + +int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs); + +static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) { + return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); +} + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_OFFSET_H_*/ diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c new file mode 100644 index 0000000000..ac9e99ebd4 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndOffset.h" +#include "mndAuth.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define MND_OFFSET_VER_NUMBER 1 +#define MND_OFFSET_RESERVE_SIZE 64 + +static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset); +static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset); +static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset); +static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pReq); + +int32_t mndInitOffset(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_OFFSET, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndOffsetActionEncode, + .decodeFp = (SdbDecodeFp)mndOffsetActionDecode, + .insertFp = (SdbInsertFp)mndOffsetActionInsert, + .updateFp = (SdbUpdateFp)mndOffsetActionUpdate, + .deleteFp = (SdbDeleteFp)mndOffsetActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_MQ_COMMIT_OFFSET, mndProcessCommitOffsetReq); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupOffset(SMnode *pMnode) {} + +SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + int32_t tlen = tEncodeSMqOffsetObj(NULL, pOffset); + int32_t size = sizeof(int32_t) + tlen + MND_OFFSET_RESERVE_SIZE; + + SSdbRaw *pRaw = sdbAllocRaw(SDB_OFFSET, MND_OFFSET_VER_NUMBER, size); + if (pRaw == NULL) goto OFFSET_ENCODE_OVER; + + buf = malloc(tlen); + if (buf == NULL) goto OFFSET_ENCODE_OVER; + + void *abuf = buf; + tEncodeSMqOffsetObj(&abuf, pOffset); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, OFFSET_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_ENCODE_OVER); + SDB_SET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, OFFSET_ENCODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +OFFSET_ENCODE_OVER: + tfree(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("offset:%s, failed to encode to raw:%p since %s", pOffset->key, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("offset:%s, encode to raw:%p, row:%p", pOffset->key, pRaw, pOffset); + return pRaw; +} + +SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto OFFSET_DECODE_OVER; + + if (sver != MND_OFFSET_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto OFFSET_DECODE_OVER; + } + + int32_t size = sizeof(SMqOffsetObj); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto OFFSET_DECODE_OVER; + + SMqOffsetObj *pOffset = sdbGetRowObj(pRow); + if (pOffset == NULL) goto OFFSET_DECODE_OVER; + + int32_t dataPos = 0; + int32_t tlen; + SDB_GET_INT32(pRaw, dataPos, &tlen, OFFSET_DECODE_OVER); + buf = malloc(tlen + 1); + if (buf == NULL) goto OFFSET_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_DECODE_OVER); + + if (tDecodeSMqOffsetObj(buf, pOffset) == NULL) { + goto OFFSET_DECODE_OVER; + } + + terrno = TSDB_CODE_SUCCESS; + +OFFSET_DECODE_OVER: + tfree(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("offset:%s, failed to decode from raw:%p since %s", pOffset->key, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("offset:%s, decode from raw:%p, row:%p", pOffset->key, pRaw, pOffset); + return pRow; +} + +int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) { + int32_t code = 0; + int32_t sz = taosArrayGetSize(vgs); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i); + SMqOffsetObj offsetObj; + if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, pConsumerEp->vgId) < 0) { + return -1; + } + offsetObj.offset = -1; + SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj); + if (pOffsetRaw == NULL) { + return -1; + } + sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY); + if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) { + return -1; + } + } + return 0; +} + +static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pMsg) { + char key[TSDB_PARTITION_KEY_LEN]; + + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SMqCMCommitOffsetReq commitOffsetReq; + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER); + + tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, &pMsg->rpcMsg); + + for (int32_t i = 0; i < commitOffsetReq.num; i++) { + SMqOffset *pOffset = &commitOffsetReq.offsets[i]; + if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) { + return -1; + } + SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); + ASSERT(pOffsetObj); + pOffsetObj->offset = pOffset->offset; + SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj); + sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pOffsetRaw); + mndReleaseOffset(pMnode, pOffsetObj); + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) { + mTrace("offset:%s, perform insert action", pOffset->key); + return 0; +} + +static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) { + mTrace("offset:%s, perform delete action", pOffset->key); + return 0; +} + +static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) { + mTrace("offset:%s, perform update action", pOldOffset->key); + return 0; +} + +SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key) { + SSdb *pSdb = pMnode->pSdb; + SMqOffsetObj *pOffset = sdbAcquire(pSdb, SDB_OFFSET, key); + if (pOffset == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_OFFSET_NOT_EXIST; + } + return pOffset; +} + +void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pOffset); +} + +static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a5c4c41244..2ea157fea4 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -19,6 +19,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndOffset.h" #include "mndShow.h" #include "mndStb.h" #include "mndTopic.h" @@ -80,13 +81,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) { +static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) { SMqSubscribeObj *pSub = tNewSubscribeObj(); if (pSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name); + char *key = mndMakeSubscribeKey(cgroup, pTopic->name); if (key == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tDeleteSMqSubscribeObj(pSub); @@ -289,9 +290,15 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); for (int32_t k = 0; k < vgsz; k++) { + char offsetKey[TSDB_PARTITION_KEY_LEN]; SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); - - SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; + SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1}; + mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId); + SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey); + if (pOffsetObj != NULL) { + vgEp.offset = pOffsetObj->offset; + mndReleaseOffset(pMnode, pOffsetObj); + } taosArrayPush(topicEp.vgs, &vgEp); } taosArrayPush(rsp.topics, &topicEp); @@ -870,7 +877,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { SUB_ENCODE_OVER: tfree(buf); - if (terrno != 0) { + if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); sdbFreeRaw(pRaw); return NULL; @@ -1085,6 +1092,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName); pSub = mndCreateSubscription(pMnode, pTopic, cgroup); createSub = true; + + mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg); } SMqSubConsumer mqSubConsumer; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 64b4aa6dd7..5b5bf661e5 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -23,6 +23,7 @@ #include "mndDnode.h" #include "mndFunc.h" #include "mndMnode.h" +#include "mndOffset.h" #include "mndProfile.h" #include "mndQnode.h" #include "mndShow.h" @@ -77,7 +78,7 @@ static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } @@ -89,7 +90,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } @@ -197,6 +198,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; @@ -440,7 +442,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; - void * ahandle = pMsg->rpcMsg.ahandle; + void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType & 1U); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4dc46b3798..ac9dde3597 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -207,9 +207,17 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; - int64_t fetchOffset = pReq->offset; + int64_t fetchOffset; /*int64_t blockingTime = pReq->blockingTime;*/ + if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { + fetchOffset = 0; + } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { + fetchOffset = walGetLastVer(pTq->pWal); + } else { + fetchOffset = pReq->currentOffset + 1; + } + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); @@ -226,31 +234,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); ASSERT(pConsumer->consumerId == consumerId); - if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { - pTopic->committedOffset = pReq->offset; - /*printf("offset %ld committed\n", pTopic->committedOffset);*/ - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; - } - - if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - if (pTopic->committedOffset < pReq->offset - 1) { - pTopic->committedOffset = pReq->offset - 1; - /*printf("offset %ld committed\n", pTopic->committedOffset);*/ - } - } - - rsp.committedOffset = pTopic->committedOffset; - rsp.reqOffset = pReq->offset; + rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; - if (fetchOffset <= pTopic->committedOffset) { - fetchOffset = pTopic->committedOffset + 1; - } - SWalHead* pHead; while (1) { int8_t pos = fetchOffset % TQ_BUFFER_SIZE; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index ade280eecc..c3947da459 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -22,14 +22,15 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); - // ser request version + // set request version void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int64_t ver = pVnode->state.processed++; taosEncodeFixedI64(&pBuf, ver); if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { - /*ASSERT(false);*/ // TODO: handle error + /*ASSERT(false);*/ + vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 18485249b3..4078ee9291 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -35,7 +35,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) } else { SStreamBlockScanInfo* pInfo = pOperator->info; if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { - qError("submit msg error while set stream msg, %s" PRIx64, id); + qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } return TSDB_CODE_SUCCESS;