diff --git a/.clang-format b/.clang-format index f60fd3cb26..e58d518b3b 100644 --- a/.clang-format +++ b/.clang-format @@ -5,6 +5,7 @@ AccessModifierOffset: -1 AlignAfterOpenBracket: Align AlignConsecutiveAssignments: false AlignConsecutiveDeclarations: true +AlignConsecutiveMacros: true AlignEscapedNewlinesLeft: true AlignOperands: true AlignTrailingComments: true @@ -86,6 +87,5 @@ SpacesInSquareBrackets: false Standard: Auto TabWidth: 8 UseTab: Never -AlignConsecutiveDeclarations: true ... diff --git a/example/src/tmq.c b/example/src/tmq.c index 5858282aab..d8c90a0127 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -88,11 +88,6 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "group.id", "tg2"); tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); return tmq; - - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_stb_topic_1"); - tmq_subscribe(tmq, topic_list); - return NULL; } tmq_list_t* build_topic_list() { @@ -109,12 +104,12 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printf("subscribe err\n"); return; } - int32_t cnt = 0; + /*int32_t cnt = 0;*/ /*clock_t startTime = clock();*/ while (running) { tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { - cnt++; + /*cnt++;*/ msg_process(tmqmessage); tmq_message_destroy(tmqmessage); /*} else {*/ @@ -192,12 +187,15 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { fprintf(stderr, "%% Consumer closed\n"); } -int main() { +int main(int argc, char* argv[]) { int code; - code = init_env(); + if (argc > 1) { + printf("env init\n"); + code = init_env(); + } tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - perf_loop(tmq, topic_list); - /*basic_consume_loop(tmq, topic_list);*/ + /*perf_loop(tmq, topic_list);*/ + basic_consume_loop(tmq, topic_list); /*sync_consume_loop(tmq, topic_list);*/ } diff --git a/include/client/taos.h b/include/client/taos.h index 9b676d1286..a960d37937 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -31,26 +31,26 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar #define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte #define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 15 // string +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_VARCHAR 15 // string #define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_JSON 17 // json -#define TSDB_DATA_TYPE_DECIMAL 18 // decimal -#define TSDB_DATA_TYPE_BLOB 19 // binary +#define TSDB_DATA_TYPE_JSON 17 // json +#define TSDB_DATA_TYPE_DECIMAL 18 // decimal +#define TSDB_DATA_TYPE_BLOB 19 // binary typedef enum { TSDB_OPTION_LOCALE, diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 92e5651cd3..c6bd61cfda 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1274,28 +1274,12 @@ _err: return NULL; } -// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / +// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or // deserialization typedef struct { - // SArray* rebSubscribes; //SArray SHashObj* rebSubHash; // SHashObj } SMqDoRebalanceMsg; -#if 0 -static FORCE_INLINE SMqDoRebalanceMsg* tNewSMqDoRebalanceMsg() { - SMqDoRebalanceMsg *pMsg = malloc(sizeof(SMqDoRebalanceMsg)); - if (pMsg == NULL) { - return NULL; - } - pMsg->rebSubscribes = taosArrayInit(0, sizeof(SMqRebSubscribe)); - if (pMsg->rebSubscribes == NULL) { - free(pMsg); - return NULL; - } - return pMsg; -} -#endif - typedef struct { int64_t status; } SMVSubscribeRsp; @@ -1779,12 +1763,6 @@ typedef struct { int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; - // char topicName[TSDB_TOPIC_FNAME_LEN]; - // char cgroup[TSDB_CONSUMER_GROUP_LEN]; - // char* sql; - // char* logicalPlan; - // char* physicalPlan; - // char* qmsg; } SMqMVRebReq; static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { @@ -1793,13 +1771,6 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); - // tlen += taosEncodeString(buf, pReq->topicName); - // tlen += taosEncodeString(buf, pReq->cgroup); - // tlen += taosEncodeString(buf, pReq->sql); - // tlen += taosEncodeString(buf, pReq->logicalPlan); - // tlen += taosEncodeString(buf, pReq->physicalPlan); - // tlen += taosEncodeString(buf, pReq->qmsg); - // tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1808,13 +1779,6 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); - // buf = taosDecodeStringTo(buf, pReq->topicName); - // buf = taosDecodeStringTo(buf, pReq->cgroup); - // buf = taosDecodeString(buf, &pReq->sql); - // buf = taosDecodeString(buf, &pReq->logicalPlan); - // buf = taosDecodeString(buf, &pReq->physicalPlan); - // buf = taosDecodeString(buf, &pReq->qmsg); - // buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cf890f5cbb..7eb8861535 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #if 0 #undef TD_MSG_INFO_ #undef TD_MSG_NUMBER_ @@ -191,3 +193,4 @@ enum { TDMT_MAX #endif }; +// clang-format on diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 315a632180..b57a985618 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -33,6 +33,7 @@ struct tmq_list_t { int32_t tot; char* elems[]; }; + struct tmq_topic_vgroup_t { char* topic; int32_t vgId; @@ -48,14 +49,17 @@ struct tmq_topic_vgroup_list_t { struct tmq_conf_t { char clientId[256]; char groupId[256]; + bool auto_commit; /*char* ip;*/ /*uint16_t port;*/ tmq_commit_cb* commit_cb; }; struct tmq_t { + // conf char groupId[256]; char clientId[256]; + bool autoCommit; SRWLatch lock; int64_t consumerId; int32_t epoch; @@ -94,25 +98,25 @@ typedef struct SMqClientTopic { SArray* vgs; // SArray } SMqClientTopic; -typedef struct SMqSubscribeCbParam { +typedef struct { tmq_t* tmq; tsem_t rspSem; tmq_resp_err_t rspErr; } SMqSubscribeCbParam; -typedef struct SMqAskEpCbParam { +typedef struct { tmq_t* tmq; int32_t wait; } SMqAskEpCbParam; -typedef struct SMqConsumeCbParam { +typedef struct { tmq_t* tmq; SMqClientVg* pVg; tmq_message_t** retMsg; tsem_t rspSem; } SMqConsumeCbParam; -typedef struct SMqCommitCbParam { +typedef struct { tmq_t* tmq; SMqClientVg* pVg; int32_t async; @@ -121,6 +125,7 @@ typedef struct SMqCommitCbParam { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); + conf->auto_commit = false; return conf; } @@ -131,11 +136,24 @@ void tmq_conf_destroy(tmq_conf_t* conf) { tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { if (strcmp(key, "group.id") == 0) { strcpy(conf->groupId, value); + return TMQ_CONF_OK; } if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); + return TMQ_CONF_OK; } - return TMQ_CONF_OK; + if (strcmp(key, "enable.auto.commit") == 0) { + if (strcmp(value, "true") == 0) { + conf->auto_commit = true; + return TMQ_CONF_OK; + } else if (strcmp(value, "false") == 0) { + conf->auto_commit = false; + return TMQ_CONF_OK; + } else { + return TMQ_CONF_INVALID; + } + } + return TMQ_CONF_UNKNOWN; } tmq_list_t* tmq_list_new() { @@ -182,11 +200,14 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->pollCnt = 0; pTmq->epoch = 0; taosInitRWLatch(&pTmq->lock); + // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); + pTmq->autoCommit = conf->auto_commit; pTmq->commit_cb = conf->commit_cb; + tsem_init(&pTmq->rspSem, 0, 0); - pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); + pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); return pTmq; } @@ -563,8 +584,15 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgSz; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + // clang-format off SMqClientVg clientVg = { - .pollCnt = 0, .committedOffset = -1, .currentOffset = -1, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet}; + .pollCnt = 0, + .committedOffset = -1, + .currentOffset = -1, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet + }; + // clang-format on taosArrayPush(topic.vgs, &clientVg); set = true; } @@ -655,7 +683,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == 0); - if (blocking_time < 0) blocking_time = 1; + if (blocking_time <= 0) blocking_time = 1; if (blocking_time > 1000) blocking_time = 1000; /*blocking_time = 1;*/ @@ -676,7 +704,8 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg); + int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_COMMIT_ONLY; + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg); if (pReq == NULL) { ASSERT(false); usleep(blocking_time * 1000); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e151ec9ae2..ee12cd6f62 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -248,7 +248,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { switch (pReq->type) { case TD_SUPER_TABLE: - tlen += taosEncodeFixedU64(buf, pReq->stbCfg.suid); + tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid); tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols); for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); @@ -265,7 +265,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { } break; case TD_CHILD_TABLE: - tlen += taosEncodeFixedU64(buf, pReq->ctbCfg.suid); + tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid); tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); break; case TD_NORMAL_TABLE: @@ -293,7 +293,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { switch (pReq->type) { case TD_SUPER_TABLE: - buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid)); + buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid)); buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols)); pReq->stbCfg.pSchema = (SSchema *)malloc(pReq->stbCfg.nCols * sizeof(SSchema)); for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { @@ -312,7 +312,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } break; case TD_CHILD_TABLE: - buf = taosDecodeFixedU64(buf, &pReq->ctbCfg.suid); + buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid); buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag); break; case TD_NORMAL_TABLE: @@ -385,14 +385,14 @@ int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) { for (int32_t i = 0; i < pReq->numOfColumns; ++i) { SField *pField = taosArrayGet(pReq->pColumns, i); - tlen += taosEncodeFixedI8(buf, pField->type); + tlen += taosEncodeFixedU8(buf, pField->type); tlen += taosEncodeFixedI32(buf, pField->bytes); tlen += taosEncodeString(buf, pField->name); } for (int32_t i = 0; i < pReq->numOfTags; ++i) { SField *pField = taosArrayGet(pReq->pTags, i); - tlen += taosEncodeFixedI8(buf, pField->type); + tlen += taosEncodeFixedU8(buf, pField->type); tlen += taosEncodeFixedI32(buf, pField->bytes); tlen += taosEncodeString(buf, pField->name); } @@ -416,7 +416,7 @@ void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) { for (int32_t i = 0; i < pReq->numOfColumns; ++i) { SField field = {0}; - buf = taosDecodeFixedI8(buf, &field.type); + buf = taosDecodeFixedU8(buf, &field.type); buf = taosDecodeFixedI32(buf, &field.bytes); buf = taosDecodeStringTo(buf, field.name); if (taosArrayPush(pReq->pColumns, &field) == NULL) { @@ -427,7 +427,7 @@ void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) { for (int32_t i = 0; i < pReq->numOfTags; ++i) { SField field = {0}; - buf = taosDecodeFixedI8(buf, &field.type); + buf = taosDecodeFixedU8(buf, &field.type); buf = taosDecodeFixedI32(buf, &field.bytes); buf = taosDecodeStringTo(buf, field.name); if (taosArrayPush(pReq->pTags, &field) == NULL) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2a3e0008a2..63689fe66e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -28,7 +28,7 @@ #include "tcompare.h" #include "tname.h" -#define MND_SUBSCRIBE_VER_NUMBER 1 +#define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_REBALANCE_CNT 3 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 252bc889f5..cad713f588 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -228,6 +228,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { pTopic->committedOffset = pReq->offset; + printf("offset %ld committed\n", pTopic->committedOffset); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = 0; @@ -236,17 +237,27 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - pTopic->committedOffset = pReq->offset - 1; + if (pTopic->committedOffset < pReq->offset - 1) { + pTopic->committedOffset = pReq->offset - 1; + printf("offset %ld committed\n", pTopic->committedOffset); + } } rsp.committedOffset = pTopic->committedOffset; rsp.reqOffset = pReq->offset; rsp.skipLogNum = 0; + if (fetchOffset <= pTopic->committedOffset) { + fetchOffset = pTopic->committedOffset + 1; + } + SWalHead* pHead; while (1) { int8_t pos = fetchOffset % TQ_BUFFER_SIZE; if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + // TODO: no more log, set timer to wait blocking time + // if data inserted during waiting, launch query and + // rsponse to user break; } pHead = pTopic->pReadhandle->pHead; @@ -263,6 +274,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } if (pDataBlock == NULL) { fetchOffset++; + pos = fetchOffset % TQ_BUFFER_SIZE; rsp.skipLogNum++; break; }