diff --git a/example/src/tmq.c b/example/src/tmq.c index 99e0c443dd..bad789d0e1 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -13,16 +13,14 @@ * along with this program. If not, see . */ +#include #include #include -#include #include #include "taos.h" -static int running = 1; -static void msg_process(tmq_message_t* message) { - tmqShowMsg(message); -} +static int running = 1; +static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -44,29 +42,28 @@ int32_t init_env() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/ - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); - /*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/ - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); - /*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/ - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); - - const char* sql = "select * from st1"; + const char* sql = "select * from tu2"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); if (taos_errno(pRes) != 0) { printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); @@ -95,7 +92,7 @@ tmq_t* build_consumer() { tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "test_stb_topic_1"); tmq_subscribe(tmq, topic_list); - return NULL; + return NULL; } tmq_list_t* build_topic_list() { @@ -104,8 +101,7 @@ tmq_list_t* build_topic_list() { return topic_list; } -void basic_consume_loop(tmq_t *tmq, - tmq_list_t *topics) { +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { tmq_resp_err_t err; if ((err = tmq_subscribe(tmq, topics))) { @@ -116,12 +112,12 @@ void basic_consume_loop(tmq_t *tmq, int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { cnt++; msg_process(tmqmessage); tmq_message_destroy(tmqmessage); - /*} else {*/ + /*} else {*/ /*break;*/ } } @@ -135,11 +131,10 @@ void basic_consume_loop(tmq_t *tmq, fprintf(stderr, "%% Consumer closed\n"); } -void sync_consume_loop(tmq_t *tmq, - tmq_list_t *topics) { +void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { static const int MIN_COMMIT_COUNT = 1000; - int msg_count = 0; + int msg_count = 0; tmq_resp_err_t err; if ((err = tmq_subscribe(tmq, topics))) { @@ -148,15 +143,14 @@ void sync_consume_loop(tmq_t *tmq, } while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); - if ((++msg_count % MIN_COMMIT_COUNT) == 0) - tmq_commit(tmq, NULL, 0); + if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); } - } + } err = tmq_consumer_close(tmq); if (err) @@ -168,7 +162,7 @@ void sync_consume_loop(tmq_t *tmq, int main() { int code; code = init_env(); - tmq_t* tmq = build_consumer(); + tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); basic_consume_loop(tmq, topic_list); /*sync_consume_loop(tmq, topic_list);*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b25f799868..92e5651cd3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -25,9 +25,9 @@ extern "C" { #include "taoserror.h" #include "tarray.h" #include "tcoding.h" -#include "trow.h" #include "thash.h" #include "tlist.h" +#include "trow.h" /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ #define TD_MSG_NUMBER_ @@ -69,7 +69,7 @@ typedef uint16_t tmsg_t; #define TSDB_IE_TYPE_DNODE_STATE 7 typedef enum { - HEARTBEAT_TYPE_MQ = 0, + HEARTBEAT_TYPE_MQ = 0, HEARTBEAT_TYPE_QUERY = 1, // types can be added here // @@ -82,7 +82,6 @@ enum { HEARTBEAT_KEY_MQ_TMP, }; - typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, @@ -192,14 +191,14 @@ typedef struct { // Submit message for one table typedef struct SSubmitBlk { - uint64_t uid; // table unique id - int32_t tid; // table id - int32_t padding; // TODO just for padding here - int32_t sversion; // data schema version - int32_t dataLen; // data part length, not including the SSubmitBlk head - int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block - char data[]; + int64_t uid; // table unique id + int32_t tid; // table id + int32_t padding; // TODO just for padding here + int32_t sversion; // data schema version + int32_t dataLen; // data part length, not including the SSubmitBlk head + int32_t schemaLen; // schema length, if length is 0, no schema exists + int16_t numOfRows; // total number of rows in current submit block + char data[]; } SSubmitBlk; typedef struct { @@ -226,7 +225,7 @@ typedef struct { typedef struct { int32_t totalLen; int32_t len; - STSRow *row; + STSRow* row; } SSubmitBlkIter; typedef struct { @@ -303,9 +302,9 @@ typedef struct { } SConnectReq; typedef struct SEpSet { - int8_t inUse; - int8_t numOfEps; - SEp eps[TSDB_MAX_REPLICA]; + int8_t inUse; + int8_t numOfEps; + SEp eps[TSDB_MAX_REPLICA]; } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { @@ -328,7 +327,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { } return buf; } - static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) { if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; @@ -736,9 +734,9 @@ typedef struct { } SDnodeCfg; typedef struct { - int32_t id; - int8_t isMnode; - SEp ep; + int32_t id; + int8_t isMnode; + SEp ep; } SDnodeEp; typedef struct { @@ -815,10 +813,10 @@ typedef struct { // todo refactor typedef struct SVgroupInfo { - int32_t vgId; - uint32_t hashBegin; - uint32_t hashEnd; - SEpSet epset; + int32_t vgId; + uint32_t hashBegin; + uint32_t hashEnd; + SEpSet epset; } SVgroupInfo; typedef struct { @@ -867,7 +865,7 @@ typedef struct { * payloadLen is the length of payload */ typedef struct { - int32_t type; + int32_t type; char db[TSDB_DB_FNAME_LEN]; int32_t payloadLen; char* payload; @@ -1024,7 +1022,7 @@ typedef struct SSubQueryMsg { uint64_t queryId; uint64_t taskId; int8_t taskType; - uint32_t sqlLen; // the query sql, + uint32_t sqlLen; // the query sql, uint32_t phyLen; char msg[]; } SSubQueryMsg; @@ -1043,7 +1041,6 @@ typedef struct { uint64_t taskId; } SQueryContinueReq; - typedef struct { SMsgHead header; uint64_t sId; @@ -1244,10 +1241,10 @@ typedef struct { } SMqTmrMsg; typedef struct { - const char* key; - SArray* lostConsumers; //SArray - SArray* removedConsumers; //SArray - SArray* newConsumers; //SArray + const char* key; + SArray* lostConsumers; // SArray + SArray* removedConsumers; // SArray + SArray* newConsumers; // SArray } SMqRebSubscribe; static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { @@ -1277,10 +1274,11 @@ _err: return NULL; } -// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization +// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / +// deserialization typedef struct { - //SArray* rebSubscribes; //SArray - SHashObj* rebSubHash; // SHashObj + // SArray* rebSubscribes; //SArray + SHashObj* rebSubHash; // SHashObj } SMqDoRebalanceMsg; #if 0 @@ -1453,9 +1451,9 @@ static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { } typedef struct SMqHbRsp { - int8_t status; //idle or not + int8_t status; // idle or not int8_t vnodeChanged; - int8_t epChanged; // should use new epset + int8_t epChanged; // should use new epset int8_t reserved; SEpSet epSet; } SMqHbRsp; @@ -1478,7 +1476,7 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { } typedef struct SMqHbOneTopicBatchRsp { - char topicName[TSDB_TOPIC_FNAME_LEN]; + char topicName[TSDB_TOPIC_FNAME_LEN]; SArray* rsps; // SArray } SMqHbOneTopicBatchRsp; @@ -1508,8 +1506,8 @@ static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTop } typedef struct SMqHbBatchRsp { - int64_t consumerId; - SArray* batchRsps; // SArray + int64_t consumerId; + SArray* batchRsps; // SArray } SMqHbBatchRsp; static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { @@ -1518,7 +1516,7 @@ static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* int32_t sz; tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); + SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i); tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); } return tlen; @@ -1530,7 +1528,7 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat buf = taosDecodeFixedI32(buf, &sz); pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp rsp; + SMqHbOneTopicBatchRsp rsp; buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); } @@ -1702,10 +1700,10 @@ static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* } typedef struct SMqHbMsg { - int32_t status; // ask hb endpoint - int32_t epoch; - int64_t consumerId; - SArray* pTopics; // SArray + int32_t status; // ask hb endpoint + int32_t epoch; + int64_t consumerId; + SArray* pTopics; // SArray } SMqHbMsg; static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { @@ -1738,15 +1736,15 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { } typedef struct { - int64_t leftForVer; - int32_t vgId; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - char* qmsg; + int64_t leftForVer; + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + char* qmsg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { @@ -1777,16 +1775,16 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { } typedef struct { - int64_t leftForVer; - int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; - //char topicName[TSDB_TOPIC_FNAME_LEN]; - //char cgroup[TSDB_CONSUMER_GROUP_LEN]; - //char* sql; - //char* logicalPlan; - //char* physicalPlan; - //char* qmsg; + int64_t leftForVer; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + // char topicName[TSDB_TOPIC_FNAME_LEN]; + // char cgroup[TSDB_CONSUMER_GROUP_LEN]; + // char* sql; + // char* logicalPlan; + // char* physicalPlan; + // char* qmsg; } SMqMVRebReq; static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { @@ -1795,13 +1793,13 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); - //tlen += taosEncodeString(buf, pReq->topicName); - //tlen += taosEncodeString(buf, pReq->cgroup); - //tlen += taosEncodeString(buf, pReq->sql); - //tlen += taosEncodeString(buf, pReq->logicalPlan); - //tlen += taosEncodeString(buf, pReq->physicalPlan); - //tlen += taosEncodeString(buf, pReq->qmsg); - //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); + // tlen += taosEncodeString(buf, pReq->topicName); + // tlen += taosEncodeString(buf, pReq->cgroup); + // tlen += taosEncodeString(buf, pReq->sql); + // tlen += taosEncodeString(buf, pReq->logicalPlan); + // tlen += taosEncodeString(buf, pReq->physicalPlan); + // tlen += taosEncodeString(buf, pReq->qmsg); + // tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1810,13 +1808,13 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); - //buf = taosDecodeStringTo(buf, pReq->topicName); - //buf = taosDecodeStringTo(buf, pReq->cgroup); - //buf = taosDecodeString(buf, &pReq->sql); - //buf = taosDecodeString(buf, &pReq->logicalPlan); - //buf = taosDecodeString(buf, &pReq->physicalPlan); - //buf = taosDecodeString(buf, &pReq->qmsg); - //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); + // buf = taosDecodeStringTo(buf, pReq->topicName); + // buf = taosDecodeStringTo(buf, pReq->cgroup); + // buf = taosDecodeString(buf, &pReq->sql); + // buf = taosDecodeString(buf, &pReq->logicalPlan); + // buf = taosDecodeString(buf, &pReq->physicalPlan); + // buf = taosDecodeString(buf, &pReq->qmsg); + // buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } @@ -1838,7 +1836,7 @@ typedef struct { typedef struct { uint32_t nCols; - SSchema *pSchema; + SSchema* pSchema; } SSchemaWrapper; static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) { @@ -1861,7 +1859,7 @@ static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeFixedU32(buf, pSW->nCols); - for (int32_t i = 0; i < pSW->nCols; i ++) { + for (int32_t i = 0; i < pSW->nCols; i++) { tlen += tEncodeSSchema(buf, &pSW->pSchema[i]); } return tlen; @@ -1869,20 +1867,20 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { buf = taosDecodeFixedU32(buf, &pSW->nCols); - pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema)); + pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; } - for (int32_t i = 0; i < pSW->nCols; i ++) { + for (int32_t i = 0; i < pSW->nCols; i++) { buf = tDecodeSSchema(buf, &pSW->pSchema[i]); } return buf; } typedef struct { - int64_t uid; - int32_t numOfRows; - char* colData; + int64_t uid; + int32_t numOfRows; + char* colData; } SMqTbData; typedef struct { @@ -1904,24 +1902,24 @@ typedef struct { int64_t rspOffset; int32_t skipLogNum; int32_t numOfTopics; - SArray* pBlockData; //SArray + SArray* pBlockData; // SArray } SMqConsumeRsp; // one req for one vg+topic typedef struct { - SMsgHead head; - //0: commit only, current offset - //1: consume only, poll next offset - //2: commit current and consume next offset - int32_t reqType; + SMsgHead head; + // 0: commit only, current offset + // 1: consume only, poll next offset + // 2: commit current and consume next offset + int32_t reqType; - int64_t reqId; - int64_t consumerId; - int64_t blockingTime; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + int64_t reqId; + int64_t consumerId; + int64_t blockingTime; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; - int64_t offset; - char topic[TSDB_TOPIC_FNAME_LEN]; + int64_t offset; + char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; typedef struct { @@ -1931,7 +1929,7 @@ typedef struct { typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; - SArray* vgs; // SArray + SArray* vgs; // SArray } SMqSubTopicEp; typedef struct { @@ -1941,9 +1939,7 @@ typedef struct { SArray* topics; // SArray } SMqCMGetSubEpRsp; -static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - taosArrayDestroy(pSubTopicEp->vgs); -} +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; @@ -1959,7 +1955,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { } static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { - taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); + taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); } static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { @@ -2026,4 +2022,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p } #endif -#endif /*_TD_COMMON_TAOS_MSG_H_*/ \ No newline at end of file +#endif /*_TD_COMMON_TAOS_MSG_H_*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79e733692c..252bc889f5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -205,6 +205,117 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { + SMqConsumeReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t fetchOffset = pReq->offset; + /*int64_t blockingTime = pReq->blockingTime;*/ + + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; + + STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); + if (pConsumer == NULL) { + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + rpcSendResponse(pMsg); + return 0; + } + int sz = taosArrayGetSize(pConsumer->topics); + ASSERT(sz == 1); + STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); + ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); + ASSERT(pConsumer->consumerId == consumerId); + + if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { + pTopic->committedOffset = pReq->offset; + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; + } + + if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { + pTopic->committedOffset = pReq->offset - 1; + } + + rsp.committedOffset = pTopic->committedOffset; + rsp.reqOffset = pReq->offset; + rsp.skipLogNum = 0; + + SWalHead* pHead; + while (1) { + int8_t pos = fetchOffset % TQ_BUFFER_SIZE; + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + break; + } + pHead = pTopic->pReadhandle->pHead; + if (pHead->head.msgType == TDMT_VND_SUBMIT) { + SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + qTaskInfo_t task = pTopic->buffer.output[pos].task; + qSetStreamInput(task, pCont); + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(false); + } + if (pDataBlock == NULL) { + fetchOffset++; + rsp.skipLogNum++; + break; + } + + taosArrayPush(pRes, pDataBlock); + rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; + rsp.rspOffset = fetchOffset; + pTopic->currentOffset = fetchOffset; + + rsp.numOfTopics = 1; + rsp.pBlockData = pRes; + + int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + + void* abuf = buf; + tEncodeSMqConsumeRsp(&abuf, &rsp); + taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; + } + } else { + fetchOffset++; + rsp.skipLogNum++; + } + } + + int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + + void* abuf = buf; + tEncodeSMqConsumeRsp(&abuf, &rsp); + rsp.pBlockData = NULL; + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; +} + +#if 0 +int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; @@ -265,6 +376,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { break; } if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + printf("read offset %ld\n", fetchOffset); // check err atomic_store_8(&pTopic->buffer.output[pos].status, 0); skip = 1; @@ -273,10 +385,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { // read until find TDMT_VND_SUBMIT pHead = pTopic->pReadhandle->pHead; if (pHead->head.msgType == TDMT_VND_SUBMIT) { - break; } rsp.skipLogNum++; if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + printf("read offset %ld\n", fetchOffset); atomic_store_8(&pTopic->buffer.output[pos].status, 0); skip = 1; break; @@ -288,6 +400,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; qTaskInfo_t task = pTopic->buffer.output[pos].task; + printf("current fetch offset %ld\n", fetchOffset); qSetStreamInput(task, pCont); // SArray @@ -307,6 +420,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { // TODO copy rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; + pTopic->currentOffset = fetchOffset; atomic_store_8(&pTopic->buffer.output[pos].status, 0); @@ -350,6 +464,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { rpcSendResponse(pMsg); return 0; } +#endif int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a83e289fec..9f76c6b76a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -52,12 +52,15 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ASSERT(pHandle->tbIdHash); void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); if (ret != NULL) { + /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); return true; + } else { + /*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/ } } return false;