From f36f1a82167020f5b7a5be6727a10f12cde61df7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Feb 2022 14:57:42 +0800 Subject: [PATCH 1/2] temporary remove multi topic consume --- example/src/tmq.c | 70 +++++---- include/common/tmsg.h | 221 ++++++++++++++--------------- source/dnode/vnode/src/tq/tq.c | 117 ++++++++++++++- source/dnode/vnode/src/tq/tqRead.c | 3 + 4 files changed, 257 insertions(+), 154 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 99e0c443dd..bad789d0e1 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -13,16 +13,14 @@ * along with this program. If not, see . */ +#include #include #include -#include #include #include "taos.h" -static int running = 1; -static void msg_process(tmq_message_t* message) { - tmqShowMsg(message); -} +static int running = 1; +static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -44,29 +42,28 @@ int32_t init_env() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/ - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); - /*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/ - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); - /*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/ - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); - - const char* sql = "select * from st1"; + const char* sql = "select * from tu2"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); if (taos_errno(pRes) != 0) { printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); @@ -95,7 +92,7 @@ tmq_t* build_consumer() { tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "test_stb_topic_1"); tmq_subscribe(tmq, topic_list); - return NULL; + return NULL; } tmq_list_t* build_topic_list() { @@ -104,8 +101,7 @@ tmq_list_t* build_topic_list() { return topic_list; } -void basic_consume_loop(tmq_t *tmq, - tmq_list_t *topics) { +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { tmq_resp_err_t err; if ((err = tmq_subscribe(tmq, topics))) { @@ -116,12 +112,12 @@ void basic_consume_loop(tmq_t *tmq, int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { cnt++; msg_process(tmqmessage); tmq_message_destroy(tmqmessage); - /*} else {*/ + /*} else {*/ /*break;*/ } } @@ -135,11 +131,10 @@ void basic_consume_loop(tmq_t *tmq, fprintf(stderr, "%% Consumer closed\n"); } -void sync_consume_loop(tmq_t *tmq, - tmq_list_t *topics) { +void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { static const int MIN_COMMIT_COUNT = 1000; - int msg_count = 0; + int msg_count = 0; tmq_resp_err_t err; if ((err = tmq_subscribe(tmq, topics))) { @@ -148,15 +143,14 @@ void sync_consume_loop(tmq_t *tmq, } while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); - if ((++msg_count % MIN_COMMIT_COUNT) == 0) - tmq_commit(tmq, NULL, 0); + if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); } - } + } err = tmq_consumer_close(tmq); if (err) @@ -168,7 +162,7 @@ void sync_consume_loop(tmq_t *tmq, int main() { int code; code = init_env(); - tmq_t* tmq = build_consumer(); + tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); basic_consume_loop(tmq, topic_list); /*sync_consume_loop(tmq, topic_list);*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bfecf2d0f0..e09b9fd0d4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -25,9 +25,9 @@ extern "C" { #include "taoserror.h" #include "tarray.h" #include "tcoding.h" -#include "trow.h" #include "thash.h" #include "tlist.h" +#include "trow.h" /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ #define TD_MSG_NUMBER_ @@ -69,7 +69,7 @@ typedef uint16_t tmsg_t; #define TSDB_IE_TYPE_DNODE_STATE 7 typedef enum { - HEARTBEAT_TYPE_MQ = 0, + HEARTBEAT_TYPE_MQ = 0, HEARTBEAT_TYPE_QUERY = 1, // types can be added here // @@ -82,7 +82,6 @@ enum { HEARTBEAT_KEY_MQ_TMP, }; - typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, @@ -192,14 +191,14 @@ typedef struct { // Submit message for one table typedef struct SSubmitBlk { - uint64_t uid; // table unique id - int32_t tid; // table id - int32_t padding; // TODO just for padding here - int32_t sversion; // data schema version - int32_t dataLen; // data part length, not including the SSubmitBlk head - int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block - char data[]; + int64_t uid; // table unique id + int32_t tid; // table id + int32_t padding; // TODO just for padding here + int32_t sversion; // data schema version + int32_t dataLen; // data part length, not including the SSubmitBlk head + int32_t schemaLen; // schema length, if length is 0, no schema exists + int16_t numOfRows; // total number of rows in current submit block + char data[]; } SSubmitBlk; typedef struct { @@ -226,7 +225,7 @@ typedef struct { typedef struct { int32_t totalLen; int32_t len; - STSRow *row; + STSRow* row; } SSubmitBlkIter; typedef struct { @@ -302,9 +301,9 @@ typedef struct { } SConnectReq; typedef struct SEpSet { - int8_t inUse; - int8_t numOfEps; - SEp eps[TSDB_MAX_REPLICA]; + int8_t inUse; + int8_t numOfEps; + SEp eps[TSDB_MAX_REPLICA]; } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { @@ -689,9 +688,9 @@ typedef struct { } SDnodeCfg; typedef struct { - int32_t id; - int8_t isMnode; - SEp ep; + int32_t id; + int8_t isMnode; + SEp ep; } SDnodeEp; typedef struct { @@ -768,10 +767,10 @@ typedef struct { // todo refactor typedef struct SVgroupInfo { - int32_t vgId; - uint32_t hashBegin; - uint32_t hashEnd; - SEpSet epset; + int32_t vgId; + uint32_t hashBegin; + uint32_t hashEnd; + SEpSet epset; } SVgroupInfo; typedef struct { @@ -962,7 +961,7 @@ typedef struct SSubQueryMsg { uint64_t queryId; uint64_t taskId; int8_t taskType; - uint32_t sqlLen; // the query sql, + uint32_t sqlLen; // the query sql, uint32_t phyLen; char msg[]; } SSubQueryMsg; @@ -981,7 +980,6 @@ typedef struct { uint64_t taskId; } SQueryContinueReq; - typedef struct { SMsgHead header; uint64_t sId; @@ -1182,10 +1180,10 @@ typedef struct { } SMqTmrMsg; typedef struct { - const char* key; - SArray* lostConsumers; //SArray - SArray* removedConsumers; //SArray - SArray* newConsumers; //SArray + const char* key; + SArray* lostConsumers; // SArray + SArray* removedConsumers; // SArray + SArray* newConsumers; // SArray } SMqRebSubscribe; static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { @@ -1215,10 +1213,11 @@ _err: return NULL; } -// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization +// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / +// deserialization typedef struct { - //SArray* rebSubscribes; //SArray - SHashObj* rebSubHash; // SHashObj + // SArray* rebSubscribes; //SArray + SHashObj* rebSubHash; // SHashObj } SMqDoRebalanceMsg; #if 0 @@ -1391,9 +1390,9 @@ static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { } typedef struct SMqHbRsp { - int8_t status; //idle or not + int8_t status; // idle or not int8_t vnodeChanged; - int8_t epChanged; // should use new epset + int8_t epChanged; // should use new epset int8_t reserved; SEpSet epSet; } SMqHbRsp; @@ -1416,7 +1415,7 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { } typedef struct SMqHbOneTopicBatchRsp { - char topicName[TSDB_TOPIC_FNAME_LEN]; + char topicName[TSDB_TOPIC_FNAME_LEN]; SArray* rsps; // SArray } SMqHbOneTopicBatchRsp; @@ -1446,8 +1445,8 @@ static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTop } typedef struct SMqHbBatchRsp { - int64_t consumerId; - SArray* batchRsps; // SArray + int64_t consumerId; + SArray* batchRsps; // SArray } SMqHbBatchRsp; static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { @@ -1456,7 +1455,7 @@ static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* int32_t sz; tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); + SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i); tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); } return tlen; @@ -1468,7 +1467,7 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat buf = taosDecodeFixedI32(buf, &sz); pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp rsp; + SMqHbOneTopicBatchRsp rsp; buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); } @@ -1508,9 +1507,7 @@ typedef struct { SArray* rsps; // SArray } SClientHbBatchRsp; -static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { - return taosIntHash_64(key, keyLen); -} +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); @@ -1518,9 +1515,8 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); - -static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { - void *pIter = taosHashIterate(info, NULL); +static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { + void* pIter = taosHashIterate(info, NULL); while (pIter != NULL) { SKv* kv = (SKv*)pIter; @@ -1530,12 +1526,11 @@ static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { } } - -static FORCE_INLINE void tFreeClientHbReq(void *pReq) { +static FORCE_INLINE void tFreeClientHbReq(void* pReq) { SClientHbReq* req = (SClientHbReq*)pReq; if (req->info) { tFreeReqKvHash(req->info); - + taosHashCleanup(req->info); } } @@ -1544,7 +1539,7 @@ int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { - SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + SClientHbBatchReq* req = (SClientHbBatchReq*)pReq; if (deep) { taosArrayDestroyEx(req->reqs, tFreeClientHbReq); } else { @@ -1553,25 +1548,23 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { free(pReq); } -static FORCE_INLINE void tFreeClientKv(void *pKv) { - SKv *kv = (SKv *)pKv; +static FORCE_INLINE void tFreeClientKv(void* pKv) { + SKv* kv = (SKv*)pKv; if (kv) { tfree(kv->value); } } -static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { +static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } - static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { - SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; + SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp; taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); } - int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); @@ -1655,10 +1648,10 @@ static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* } typedef struct SMqHbMsg { - int32_t status; // ask hb endpoint - int32_t epoch; - int64_t consumerId; - SArray* pTopics; // SArray + int32_t status; // ask hb endpoint + int32_t epoch; + int64_t consumerId; + SArray* pTopics; // SArray } SMqHbMsg; static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { @@ -1691,15 +1684,15 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { } typedef struct { - int64_t leftForVer; - int32_t vgId; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - char* qmsg; + int64_t leftForVer; + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + char* qmsg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { @@ -1730,16 +1723,16 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { } typedef struct { - int64_t leftForVer; - int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; - //char topicName[TSDB_TOPIC_FNAME_LEN]; - //char cgroup[TSDB_CONSUMER_GROUP_LEN]; - //char* sql; - //char* logicalPlan; - //char* physicalPlan; - //char* qmsg; + int64_t leftForVer; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + // char topicName[TSDB_TOPIC_FNAME_LEN]; + // char cgroup[TSDB_CONSUMER_GROUP_LEN]; + // char* sql; + // char* logicalPlan; + // char* physicalPlan; + // char* qmsg; } SMqMVRebReq; static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { @@ -1748,13 +1741,13 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); - //tlen += taosEncodeString(buf, pReq->topicName); - //tlen += taosEncodeString(buf, pReq->cgroup); - //tlen += taosEncodeString(buf, pReq->sql); - //tlen += taosEncodeString(buf, pReq->logicalPlan); - //tlen += taosEncodeString(buf, pReq->physicalPlan); - //tlen += taosEncodeString(buf, pReq->qmsg); - //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); + // tlen += taosEncodeString(buf, pReq->topicName); + // tlen += taosEncodeString(buf, pReq->cgroup); + // tlen += taosEncodeString(buf, pReq->sql); + // tlen += taosEncodeString(buf, pReq->logicalPlan); + // tlen += taosEncodeString(buf, pReq->physicalPlan); + // tlen += taosEncodeString(buf, pReq->qmsg); + // tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1763,13 +1756,13 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); - //buf = taosDecodeStringTo(buf, pReq->topicName); - //buf = taosDecodeStringTo(buf, pReq->cgroup); - //buf = taosDecodeString(buf, &pReq->sql); - //buf = taosDecodeString(buf, &pReq->logicalPlan); - //buf = taosDecodeString(buf, &pReq->physicalPlan); - //buf = taosDecodeString(buf, &pReq->qmsg); - //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); + // buf = taosDecodeStringTo(buf, pReq->topicName); + // buf = taosDecodeStringTo(buf, pReq->cgroup); + // buf = taosDecodeString(buf, &pReq->sql); + // buf = taosDecodeString(buf, &pReq->logicalPlan); + // buf = taosDecodeString(buf, &pReq->physicalPlan); + // buf = taosDecodeString(buf, &pReq->qmsg); + // buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } @@ -1791,7 +1784,7 @@ typedef struct { typedef struct { uint32_t nCols; - SSchema *pSchema; + SSchema* pSchema; } SSchemaWrapper; static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) { @@ -1814,7 +1807,7 @@ static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeFixedU32(buf, pSW->nCols); - for (int32_t i = 0; i < pSW->nCols; i ++) { + for (int32_t i = 0; i < pSW->nCols; i++) { tlen += tEncodeSSchema(buf, &pSW->pSchema[i]); } return tlen; @@ -1822,20 +1815,20 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { buf = taosDecodeFixedU32(buf, &pSW->nCols); - pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema)); + pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; } - for (int32_t i = 0; i < pSW->nCols; i ++) { + for (int32_t i = 0; i < pSW->nCols; i++) { buf = tDecodeSSchema(buf, &pSW->pSchema[i]); } return buf; } typedef struct { - int64_t uid; - int32_t numOfRows; - char* colData; + int64_t uid; + int32_t numOfRows; + char* colData; } SMqTbData; typedef struct { @@ -1857,24 +1850,24 @@ typedef struct { int64_t rspOffset; int32_t skipLogNum; int32_t numOfTopics; - SArray* pBlockData; //SArray + SArray* pBlockData; // SArray } SMqConsumeRsp; // one req for one vg+topic typedef struct { - SMsgHead head; - //0: commit only, current offset - //1: consume only, poll next offset - //2: commit current and consume next offset - int32_t reqType; + SMsgHead head; + // 0: commit only, current offset + // 1: consume only, poll next offset + // 2: commit current and consume next offset + int32_t reqType; - int64_t reqId; - int64_t consumerId; - int64_t blockingTime; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + int64_t reqId; + int64_t consumerId; + int64_t blockingTime; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; - int64_t offset; - char topic[TSDB_TOPIC_FNAME_LEN]; + int64_t offset; + char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; typedef struct { @@ -1884,7 +1877,7 @@ typedef struct { typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; - SArray* vgs; // SArray + SArray* vgs; // SArray } SMqSubTopicEp; typedef struct { @@ -1894,9 +1887,7 @@ typedef struct { SArray* topics; // SArray } SMqCMGetSubEpRsp; -static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - taosArrayDestroy(pSubTopicEp->vgs); -} +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; @@ -1912,7 +1903,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { } static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { - taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); + taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); } static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79e733692c..252bc889f5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -205,6 +205,117 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { + SMqConsumeReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t fetchOffset = pReq->offset; + /*int64_t blockingTime = pReq->blockingTime;*/ + + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; + + STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); + if (pConsumer == NULL) { + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + rpcSendResponse(pMsg); + return 0; + } + int sz = taosArrayGetSize(pConsumer->topics); + ASSERT(sz == 1); + STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); + ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); + ASSERT(pConsumer->consumerId == consumerId); + + if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { + pTopic->committedOffset = pReq->offset; + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; + } + + if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { + pTopic->committedOffset = pReq->offset - 1; + } + + rsp.committedOffset = pTopic->committedOffset; + rsp.reqOffset = pReq->offset; + rsp.skipLogNum = 0; + + SWalHead* pHead; + while (1) { + int8_t pos = fetchOffset % TQ_BUFFER_SIZE; + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + break; + } + pHead = pTopic->pReadhandle->pHead; + if (pHead->head.msgType == TDMT_VND_SUBMIT) { + SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + qTaskInfo_t task = pTopic->buffer.output[pos].task; + qSetStreamInput(task, pCont); + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(false); + } + if (pDataBlock == NULL) { + fetchOffset++; + rsp.skipLogNum++; + break; + } + + taosArrayPush(pRes, pDataBlock); + rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; + rsp.rspOffset = fetchOffset; + pTopic->currentOffset = fetchOffset; + + rsp.numOfTopics = 1; + rsp.pBlockData = pRes; + + int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + + void* abuf = buf; + tEncodeSMqConsumeRsp(&abuf, &rsp); + taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; + } + } else { + fetchOffset++; + rsp.skipLogNum++; + } + } + + int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + + void* abuf = buf; + tEncodeSMqConsumeRsp(&abuf, &rsp); + rsp.pBlockData = NULL; + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; +} + +#if 0 +int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; @@ -265,6 +376,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { break; } if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + printf("read offset %ld\n", fetchOffset); // check err atomic_store_8(&pTopic->buffer.output[pos].status, 0); skip = 1; @@ -273,10 +385,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { // read until find TDMT_VND_SUBMIT pHead = pTopic->pReadhandle->pHead; if (pHead->head.msgType == TDMT_VND_SUBMIT) { - break; } rsp.skipLogNum++; if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + printf("read offset %ld\n", fetchOffset); atomic_store_8(&pTopic->buffer.output[pos].status, 0); skip = 1; break; @@ -288,6 +400,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; qTaskInfo_t task = pTopic->buffer.output[pos].task; + printf("current fetch offset %ld\n", fetchOffset); qSetStreamInput(task, pCont); // SArray @@ -307,6 +420,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { // TODO copy rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; + pTopic->currentOffset = fetchOffset; atomic_store_8(&pTopic->buffer.output[pos].status, 0); @@ -350,6 +464,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { rpcSendResponse(pMsg); return 0; } +#endif int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a83e289fec..9f76c6b76a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -52,12 +52,15 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ASSERT(pHandle->tbIdHash); void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); if (ret != NULL) { + /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); return true; + } else { + /*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/ } } return false; From 0bd39327f43af89fb29d675f19dd3525814e43a1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Feb 2022 15:08:46 +0800 Subject: [PATCH 2/2] fix conflict --- include/common/tmsg.h | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e336bae658..92e5651cd3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -82,7 +82,6 @@ enum { HEARTBEAT_KEY_MQ_TMP, }; - typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, @@ -328,6 +327,25 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { } return buf; } +static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) { + if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; + if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; + if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + +static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) { + if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; + if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} typedef struct { int32_t acctId; @@ -847,7 +865,7 @@ typedef struct { * payloadLen is the length of payload */ typedef struct { - int32_t type; + int32_t type; char db[TSDB_DB_FNAME_LEN]; int32_t payloadLen; char* payload; @@ -1594,7 +1612,6 @@ static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } - static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp; taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);