Merge branch '3.0' into feature/3.0_liaohj

This commit is contained in:
Shuduo Sang 2022-02-15 07:27:29 +00:00
commit 2741e5add6
4 changed files with 249 additions and 141 deletions

View File

@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <assert.h>
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
static void msg_process(tmq_message_t* message) { static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
tmqShowMsg(message);
}
int32_t init_env() { int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -44,29 +42,28 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/ pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
/*if (taos_errno(pRes) != 0) {*/ if (taos_errno(pRes) != 0) {
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/ printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
/*return -1;*/ return -1;
/*}*/ }
/*taos_free_result(pRes);*/ taos_free_result(pRes);
/*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/ pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
/*if (taos_errno(pRes) != 0) {*/ if (taos_errno(pRes) != 0) {
/*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/ printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
/*return -1;*/ return -1;
/*}*/ }
/*taos_free_result(pRes);*/ taos_free_result(pRes);
/*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/ pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
/*if (taos_errno(pRes) != 0) {*/ if (taos_errno(pRes) != 0) {
/*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/ printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
/*return -1;*/ return -1;
/*}*/ }
/*taos_free_result(pRes);*/ taos_free_result(pRes);
const char* sql = "select * from tu2";
const char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
@ -95,7 +92,7 @@ tmq_t* build_consumer() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1"); tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list); tmq_subscribe(tmq, topic_list);
return NULL; return NULL;
} }
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
@ -104,8 +101,7 @@ tmq_list_t* build_topic_list() {
return topic_list; return topic_list;
} }
void basic_consume_loop(tmq_t *tmq, void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_list_t *topics) {
tmq_resp_err_t err; tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) { if ((err = tmq_subscribe(tmq, topics))) {
@ -116,12 +112,12 @@ void basic_consume_loop(tmq_t *tmq,
int32_t cnt = 0; int32_t cnt = 0;
/*clock_t startTime = clock();*/ /*clock_t startTime = clock();*/
while (running) { while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) { if (tmqmessage) {
cnt++; cnt++;
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
/*} else {*/ /*} else {*/
/*break;*/ /*break;*/
} }
} }
@ -135,11 +131,10 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n"); fprintf(stderr, "%% Consumer closed\n");
} }
void sync_consume_loop(tmq_t *tmq, void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000; static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0; int msg_count = 0;
tmq_resp_err_t err; tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) { if ((err = tmq_subscribe(tmq, topics))) {
@ -148,15 +143,14 @@ void sync_consume_loop(tmq_t *tmq,
} }
while (running) { while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
tmq_commit(tmq, NULL, 0);
} }
} }
err = tmq_consumer_close(tmq); err = tmq_consumer_close(tmq);
if (err) if (err)
@ -168,7 +162,7 @@ void sync_consume_loop(tmq_t *tmq,
int main() { int main() {
int code; int code;
code = init_env(); code = init_env();
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list); basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/ /*sync_consume_loop(tmq, topic_list);*/

View File

@ -25,9 +25,9 @@ extern "C" {
#include "taoserror.h" #include "taoserror.h"
#include "tarray.h" #include "tarray.h"
#include "tcoding.h" #include "tcoding.h"
#include "trow.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "trow.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */ /* ------------------------ MESSAGE DEFINITIONS ------------------------ */
#define TD_MSG_NUMBER_ #define TD_MSG_NUMBER_
@ -69,7 +69,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum { typedef enum {
HEARTBEAT_TYPE_MQ = 0, HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1, HEARTBEAT_TYPE_QUERY = 1,
// types can be added here // types can be added here
// //
@ -82,7 +82,6 @@ enum {
HEARTBEAT_KEY_MQ_TMP, HEARTBEAT_KEY_MQ_TMP,
}; };
typedef enum _mgmt_table { typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_ACCT,
@ -192,14 +191,14 @@ typedef struct {
// Submit message for one table // Submit message for one table
typedef struct SSubmitBlk { typedef struct SSubmitBlk {
uint64_t uid; // table unique id int64_t uid; // table unique id
int32_t tid; // table id int32_t tid; // table id
int32_t padding; // TODO just for padding here int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block int16_t numOfRows; // total number of rows in current submit block
char data[]; char data[];
} SSubmitBlk; } SSubmitBlk;
typedef struct { typedef struct {
@ -226,7 +225,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
STSRow *row; STSRow* row;
} SSubmitBlkIter; } SSubmitBlkIter;
typedef struct { typedef struct {
@ -303,9 +302,9 @@ typedef struct {
} SConnectReq; } SConnectReq;
typedef struct SEpSet { typedef struct SEpSet {
int8_t inUse; int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA]; SEp eps[TSDB_MAX_REPLICA];
} SEpSet; } SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
@ -328,7 +327,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
} }
return buf; return buf;
} }
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) { static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
@ -736,9 +734,9 @@ typedef struct {
} SDnodeCfg; } SDnodeCfg;
typedef struct { typedef struct {
int32_t id; int32_t id;
int8_t isMnode; int8_t isMnode;
SEp ep; SEp ep;
} SDnodeEp; } SDnodeEp;
typedef struct { typedef struct {
@ -815,10 +813,10 @@ typedef struct {
// todo refactor // todo refactor
typedef struct SVgroupInfo { typedef struct SVgroupInfo {
int32_t vgId; int32_t vgId;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
SEpSet epset; SEpSet epset;
} SVgroupInfo; } SVgroupInfo;
typedef struct { typedef struct {
@ -867,7 +865,7 @@ typedef struct {
* payloadLen is the length of payload * payloadLen is the length of payload
*/ */
typedef struct { typedef struct {
int32_t type; int32_t type;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int32_t payloadLen; int32_t payloadLen;
char* payload; char* payload;
@ -1024,7 +1022,7 @@ typedef struct SSubQueryMsg {
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
int8_t taskType; int8_t taskType;
uint32_t sqlLen; // the query sql, uint32_t sqlLen; // the query sql,
uint32_t phyLen; uint32_t phyLen;
char msg[]; char msg[];
} SSubQueryMsg; } SSubQueryMsg;
@ -1043,7 +1041,6 @@ typedef struct {
uint64_t taskId; uint64_t taskId;
} SQueryContinueReq; } SQueryContinueReq;
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
uint64_t sId; uint64_t sId;
@ -1244,10 +1241,10 @@ typedef struct {
} SMqTmrMsg; } SMqTmrMsg;
typedef struct { typedef struct {
const char* key; const char* key;
SArray* lostConsumers; //SArray<int64_t> SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; //SArray<int64_t> SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; //SArray<int64_t> SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe; } SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
@ -1277,10 +1274,11 @@ _err:
return NULL; return NULL;
} }
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization // this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization /
// deserialization
typedef struct { typedef struct {
//SArray* rebSubscribes; //SArray<SMqRebSubscribe> // SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe> SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg; } SMqDoRebalanceMsg;
#if 0 #if 0
@ -1453,9 +1451,9 @@ static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
} }
typedef struct SMqHbRsp { typedef struct SMqHbRsp {
int8_t status; //idle or not int8_t status; // idle or not
int8_t vnodeChanged; int8_t vnodeChanged;
int8_t epChanged; // should use new epset int8_t epChanged; // should use new epset
int8_t reserved; int8_t reserved;
SEpSet epSet; SEpSet epSet;
} SMqHbRsp; } SMqHbRsp;
@ -1478,7 +1476,7 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
} }
typedef struct SMqHbOneTopicBatchRsp { typedef struct SMqHbOneTopicBatchRsp {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray<SMqHbRsp> SArray* rsps; // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp; } SMqHbOneTopicBatchRsp;
@ -1508,8 +1506,8 @@ static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTop
} }
typedef struct SMqHbBatchRsp { typedef struct SMqHbBatchRsp {
int64_t consumerId; int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp> SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
} SMqHbBatchRsp; } SMqHbBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
@ -1518,7 +1516,7 @@ static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp*
int32_t sz; int32_t sz;
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
} }
return tlen; return tlen;
@ -1530,7 +1528,7 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp rsp; SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
} }
@ -1702,10 +1700,10 @@ static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo*
} }
typedef struct SMqHbMsg { typedef struct SMqHbMsg {
int32_t status; // ask hb endpoint int32_t status; // ask hb endpoint
int32_t epoch; int32_t epoch;
int64_t consumerId; int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo> SArray* pTopics; // SArray<SMqHbTopicInfo>
} SMqHbMsg; } SMqHbMsg;
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
@ -1738,15 +1736,15 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
} }
typedef struct { typedef struct {
int64_t leftForVer; int64_t leftForVer;
int32_t vgId; int32_t vgId;
int64_t consumerId; int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
char* qmsg; char* qmsg;
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
@ -1777,16 +1775,16 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
} }
typedef struct { typedef struct {
int64_t leftForVer; int64_t leftForVer;
int32_t vgId; int32_t vgId;
int64_t oldConsumerId; int64_t oldConsumerId;
int64_t newConsumerId; int64_t newConsumerId;
//char topicName[TSDB_TOPIC_FNAME_LEN]; // char topicName[TSDB_TOPIC_FNAME_LEN];
//char cgroup[TSDB_CONSUMER_GROUP_LEN]; // char cgroup[TSDB_CONSUMER_GROUP_LEN];
//char* sql; // char* sql;
//char* logicalPlan; // char* logicalPlan;
//char* physicalPlan; // char* physicalPlan;
//char* qmsg; // char* qmsg;
} SMqMVRebReq; } SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
@ -1795,13 +1793,13 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
//tlen += taosEncodeString(buf, pReq->topicName); // tlen += taosEncodeString(buf, pReq->topicName);
//tlen += taosEncodeString(buf, pReq->cgroup); // tlen += taosEncodeString(buf, pReq->cgroup);
//tlen += taosEncodeString(buf, pReq->sql); // tlen += taosEncodeString(buf, pReq->sql);
//tlen += taosEncodeString(buf, pReq->logicalPlan); // tlen += taosEncodeString(buf, pReq->logicalPlan);
//tlen += taosEncodeString(buf, pReq->physicalPlan); // tlen += taosEncodeString(buf, pReq->physicalPlan);
//tlen += taosEncodeString(buf, pReq->qmsg); // tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); // tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
} }
@ -1810,13 +1808,13 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
//buf = taosDecodeStringTo(buf, pReq->topicName); // buf = taosDecodeStringTo(buf, pReq->topicName);
//buf = taosDecodeStringTo(buf, pReq->cgroup); // buf = taosDecodeStringTo(buf, pReq->cgroup);
//buf = taosDecodeString(buf, &pReq->sql); // buf = taosDecodeString(buf, &pReq->sql);
//buf = taosDecodeString(buf, &pReq->logicalPlan); // buf = taosDecodeString(buf, &pReq->logicalPlan);
//buf = taosDecodeString(buf, &pReq->physicalPlan); // buf = taosDecodeString(buf, &pReq->physicalPlan);
//buf = taosDecodeString(buf, &pReq->qmsg); // buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg); // buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;
} }
@ -1838,7 +1836,7 @@ typedef struct {
typedef struct { typedef struct {
uint32_t nCols; uint32_t nCols;
SSchema *pSchema; SSchema* pSchema;
} SSchemaWrapper; } SSchemaWrapper;
static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) {
@ -1861,7 +1859,7 @@ static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols); tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i ++) { for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += tEncodeSSchema(buf, &pSW->pSchema[i]); tlen += tEncodeSSchema(buf, &pSW->pSchema[i]);
} }
return tlen; return tlen;
@ -1869,20 +1867,20 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols); buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema)); pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) { if (pSW->pSchema == NULL) {
return NULL; return NULL;
} }
for (int32_t i = 0; i < pSW->nCols; i ++) { for (int32_t i = 0; i < pSW->nCols; i++) {
buf = tDecodeSSchema(buf, &pSW->pSchema[i]); buf = tDecodeSSchema(buf, &pSW->pSchema[i]);
} }
return buf; return buf;
} }
typedef struct { typedef struct {
int64_t uid; int64_t uid;
int32_t numOfRows; int32_t numOfRows;
char* colData; char* colData;
} SMqTbData; } SMqTbData;
typedef struct { typedef struct {
@ -1904,24 +1902,24 @@ typedef struct {
int64_t rspOffset; int64_t rspOffset;
int32_t skipLogNum; int32_t skipLogNum;
int32_t numOfTopics; int32_t numOfTopics;
SArray* pBlockData; //SArray<SSDataBlock> SArray* pBlockData; // SArray<SSDataBlock>
} SMqConsumeRsp; } SMqConsumeRsp;
// one req for one vg+topic // one req for one vg+topic
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
//0: commit only, current offset // 0: commit only, current offset
//1: consume only, poll next offset // 1: consume only, poll next offset
//2: commit current and consume next offset // 2: commit current and consume next offset
int32_t reqType; int32_t reqType;
int64_t reqId; int64_t reqId;
int64_t consumerId; int64_t consumerId;
int64_t blockingTime; int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t offset; int64_t offset;
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq; } SMqConsumeReq;
typedef struct { typedef struct {
@ -1931,7 +1929,7 @@ typedef struct {
typedef struct { typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp> SArray* vgs; // SArray<SMqSubVgEp>
} SMqSubTopicEp; } SMqSubTopicEp;
typedef struct { typedef struct {
@ -1941,9 +1939,7 @@ typedef struct {
SArray* topics; // SArray<SMqSubTopicEp> SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp; } SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
@ -1959,7 +1955,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
} }
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
} }
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
@ -2026,4 +2022,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
} }
#endif #endif
#endif /*_TD_COMMON_TAOS_MSG_H_*/ #endif /*_TD_COMMON_TAOS_MSG_H_*/

View File

@ -205,6 +205,117 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
} }
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset = pReq->offset;
/*int64_t blockingTime = pReq->blockingTime;*/
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
rpcSendResponse(pMsg);
return 0;
}
int sz = taosArrayGetSize(pConsumer->topics);
ASSERT(sz == 1);
STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
ASSERT(pConsumer->consumerId == consumerId);
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset - 1;
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
SWalHead* pHead;
while (1) {
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
break;
}
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
fetchOffset++;
rsp.skipLogNum++;
break;
}
taosArrayPush(pRes, pDataBlock);
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
} else {
fetchOffset++;
rsp.skipLogNum++;
}
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
#if 0
int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
@ -265,6 +376,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break; break;
} }
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
// check err // check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0); atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1; skip = 1;
@ -273,10 +385,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// read until find TDMT_VND_SUBMIT // read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead; pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) { if (pHead->head.msgType == TDMT_VND_SUBMIT) {
break;
} }
rsp.skipLogNum++; rsp.skipLogNum++;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
atomic_store_8(&pTopic->buffer.output[pos].status, 0); atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1; skip = 1;
break; break;
@ -288,6 +400,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task; qTaskInfo_t task = pTopic->buffer.output[pos].task;
printf("current fetch offset %ld\n", fetchOffset);
qSetStreamInput(task, pCont); qSetStreamInput(task, pCont);
// SArray<SSDataBlock> // SArray<SSDataBlock>
@ -307,6 +420,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// TODO copy // TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
atomic_store_8(&pTopic->buffer.output[pos].status, 0); atomic_store_8(&pTopic->buffer.output[pos].status, 0);
@ -350,6 +464,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
rpcSendResponse(pMsg); rpcSendResponse(pMsg);
return 0; return 0;
} }
#endif
int32_t tqProcessRebReq(STQ* pTq, char* msg) { int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0}; SMqMVRebReq req = {0};

View File

@ -52,12 +52,15 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
ASSERT(pHandle->tbIdHash); ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) { if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true; return true;
} else {
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
} }
} }
return false; return false;