Merge pull request #9986 from taosdata/feature/tq
expose interfaces of mq
This commit is contained in:
commit
4ba79b1427
|
@ -192,9 +192,38 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
|
||||||
|
|
||||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
|
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
|
||||||
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
|
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
|
||||||
|
typedef struct tmq_t tmq_t;
|
||||||
|
typedef struct tmq_conf_t tmq_conf_t;
|
||||||
|
typedef struct tmq_list_t tmq_list_t;
|
||||||
|
|
||||||
|
typedef struct tmq_message_t tmq_message_t;
|
||||||
|
typedef struct tmq_message_topic_t tmq_message_topic_t;
|
||||||
|
typedef struct tmq_message_tb_t tmq_message_tb_t;
|
||||||
|
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
|
||||||
|
typedef struct tmq_message_col_t tmq_message_col_t;
|
||||||
|
typedef struct tmq_col_iter_t tmq_col_iter_t;
|
||||||
|
|
||||||
|
DLL_EXPORT tmq_list_t* tmq_list_new();
|
||||||
|
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
|
||||||
|
|
||||||
|
DLL_EXPORT tmq_conf_t* tmq_conf_new();
|
||||||
|
|
||||||
|
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
|
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
|
||||||
|
|
||||||
|
DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen);
|
||||||
|
|
||||||
|
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
|
||||||
|
|
||||||
|
DLL_EXPORT tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time);
|
||||||
|
|
||||||
|
DLL_EXPORT int32_t tmq_topic_num(tmq_message_t* msg);
|
||||||
|
DLL_EXPORT char* tmq_get_topic(tmq_message_topic_t* msg);
|
||||||
|
DLL_EXPORT int32_t tmq_get_vgId(tmq_message_topic_t* msg);
|
||||||
|
DLL_EXPORT tmq_message_tb_t* tmq_get_next_tb(tmq_message_topic_t* msg, tmq_tb_iter_t* iter);
|
||||||
|
DLL_EXPORT tmq_message_col_t* tmq_get_next_col(tmq_message_tb_t* msg, tmq_col_iter_t* iter);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1519,7 +1519,8 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
||||||
|
|
||||||
typedef struct SMqSetCVgReq {
|
typedef struct SMqSetCVgReq {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t consumerId;
|
int64_t oldConsumerId;
|
||||||
|
int64_t newConsumerId;
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
char* sql;
|
char* sql;
|
||||||
|
@ -1550,7 +1551,8 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
||||||
tlen += taosEncodeString(buf, pReq->topicName);
|
tlen += taosEncodeString(buf, pReq->topicName);
|
||||||
tlen += taosEncodeString(buf, pReq->cgroup);
|
tlen += taosEncodeString(buf, pReq->cgroup);
|
||||||
tlen += taosEncodeString(buf, pReq->sql);
|
tlen += taosEncodeString(buf, pReq->sql);
|
||||||
|
@ -1562,7 +1564,8 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
||||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
buf = taosDecodeStringTo(buf, pReq->topicName);
|
||||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||||
buf = taosDecodeString(buf, &pReq->sql);
|
buf = taosDecodeString(buf, &pReq->sql);
|
||||||
|
@ -1579,15 +1582,6 @@ typedef struct SMqSetCVgRsp {
|
||||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
} SMqSetCVgRsp;
|
} SMqSetCVgRsp;
|
||||||
|
|
||||||
typedef struct SMqConsumeReq {
|
|
||||||
int64_t reqId;
|
|
||||||
int64_t offset;
|
|
||||||
int64_t consumerId;
|
|
||||||
int64_t blockingTime;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
|
||||||
} SMqConsumeReq;
|
|
||||||
|
|
||||||
typedef struct SMqColData {
|
typedef struct SMqColData {
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
int16_t type;
|
int16_t type;
|
||||||
|
@ -1615,12 +1609,29 @@ typedef struct SMqTopicBlk {
|
||||||
|
|
||||||
typedef struct SMqConsumeRsp {
|
typedef struct SMqConsumeRsp {
|
||||||
int64_t reqId;
|
int64_t reqId;
|
||||||
int64_t clientId;
|
int64_t consumerId;
|
||||||
int32_t bodyLen;
|
int32_t bodyLen;
|
||||||
int32_t numOfTopics;
|
int32_t numOfTopics;
|
||||||
SMqTopicData data[];
|
SMqTopicData data[];
|
||||||
} SMqConsumeRsp;
|
} SMqConsumeRsp;
|
||||||
|
|
||||||
|
// one req for one vg+topic
|
||||||
|
typedef struct SMqConsumeReq {
|
||||||
|
//0: commit only, current offset
|
||||||
|
//1: consume only, poll next offset
|
||||||
|
//2: commit current and consume next offset
|
||||||
|
int32_t reqType;
|
||||||
|
|
||||||
|
int64_t reqId;
|
||||||
|
int64_t consumerId;
|
||||||
|
int64_t blockingTime;
|
||||||
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
|
|
||||||
|
int64_t offset;
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
} SMqConsumeReq;
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
||||||
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
|
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
|
||||||
p->mgmtEp = epSet;
|
p->mgmtEp = epSet;
|
||||||
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
|
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
|
||||||
/*p->pAppHbMgr = appHbMgrInit(p);*/
|
p->pAppHbMgr = appHbMgrInit(p);
|
||||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||||
|
|
||||||
pInst = &p;
|
pInst = &p;
|
||||||
|
@ -254,14 +254,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
|
||||||
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
|
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct tmq_t tmq_t;
|
|
||||||
|
|
||||||
typedef struct SMqClientTopic {
|
typedef struct SMqClientVg {
|
||||||
// subscribe info
|
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
char* topicName;
|
|
||||||
int64_t topicId;
|
|
||||||
// statistics
|
// statistics
|
||||||
int64_t consumeCnt;
|
int64_t consumeCnt;
|
||||||
// offset
|
// offset
|
||||||
|
@ -270,36 +264,160 @@ typedef struct SMqClientTopic {
|
||||||
//connection info
|
//connection info
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
} SMqClientVg;
|
||||||
|
|
||||||
|
typedef struct SMqClientTopic {
|
||||||
|
// subscribe info
|
||||||
|
int32_t sqlLen;
|
||||||
|
char* sql;
|
||||||
|
char* topicName;
|
||||||
|
int64_t topicId;
|
||||||
|
int32_t nextVgIdx;
|
||||||
|
SArray* vgs; //SArray<SMqClientVg>
|
||||||
} SMqClientTopic;
|
} SMqClientTopic;
|
||||||
|
|
||||||
typedef struct tmq_resp_err_t {
|
typedef struct tmq_resp_err_t {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
} tmq_resp_err_t;
|
} tmq_resp_err_t;
|
||||||
|
|
||||||
typedef struct tmq_topic_vgroup_list_t {
|
typedef struct tmq_topic_vgroup_t {
|
||||||
char* topicName;
|
char* topic;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t committedOffset;
|
int64_t commitOffset;
|
||||||
|
} tmq_topic_vgroup_t;
|
||||||
|
|
||||||
|
typedef struct tmq_topic_vgroup_list_t {
|
||||||
|
int32_t cnt;
|
||||||
|
int32_t size;
|
||||||
|
tmq_topic_vgroup_t* elems;
|
||||||
} tmq_topic_vgroup_list_t;
|
} tmq_topic_vgroup_list_t;
|
||||||
|
|
||||||
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
|
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
|
||||||
|
|
||||||
typedef struct tmq_conf_t{
|
struct tmq_conf_t {
|
||||||
char* clientId;
|
char clientId[256];
|
||||||
char* groupId;
|
char groupId[256];
|
||||||
char* ip;
|
char* ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commit_cb;
|
||||||
} tmq_conf_t;
|
};
|
||||||
|
|
||||||
|
tmq_conf_t* tmq_conf_new() {
|
||||||
|
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
||||||
|
if (strcmp(key, "group.id")) {
|
||||||
|
strcpy(conf->groupId, value);
|
||||||
|
}
|
||||||
|
if (strcmp(key, "client.id")) {
|
||||||
|
strcpy(conf->clientId, value);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
struct tmq_t {
|
struct tmq_t {
|
||||||
char groupId[256];
|
char groupId[256];
|
||||||
char clientId[256];
|
char clientId[256];
|
||||||
|
int64_t consumerId;
|
||||||
|
int64_t status;
|
||||||
STscObj* pTscObj;
|
STscObj* pTscObj;
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commit_cb;
|
||||||
|
int32_t nextTopicIdx;
|
||||||
SArray* clientTopics; //SArray<SMqClientTopic>
|
SArray* clientTopics; //SArray<SMqClientTopic>
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
|
||||||
|
if (pTmq == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pTmq->pTscObj = (STscObj*)conn;
|
||||||
|
pTmq->status = 0;
|
||||||
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
|
pTmq->commit_cb = conf->commit_cb;
|
||||||
|
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
|
||||||
|
return pTmq;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct tmq_list_t {
|
||||||
|
int32_t cnt;
|
||||||
|
int32_t tot;
|
||||||
|
char* elems[];
|
||||||
|
};
|
||||||
|
tmq_list_t* tmq_list_new() {
|
||||||
|
tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
|
||||||
|
if (ptr == NULL) {
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
ptr->cnt = 0;
|
||||||
|
ptr->tot = 8;
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
|
||||||
|
if (ptr->cnt >= ptr->tot-1) return -1;
|
||||||
|
ptr->elems[ptr->cnt] = src;
|
||||||
|
ptr->cnt++;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
|
SRequestObj *pRequest = NULL;
|
||||||
|
tmq->status = 1;
|
||||||
|
int32_t sz = topic_list->cnt;
|
||||||
|
tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
|
||||||
|
for (int i = 0; i < sz; i++) {
|
||||||
|
char* topicName = strdup(topic_list->elems[i]);
|
||||||
|
taosArrayPush(tmq->clientTopics, &topicName);
|
||||||
|
}
|
||||||
|
SCMSubscribeReq req;
|
||||||
|
req.topicNum = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
req.consumerId = tmq->consumerId;
|
||||||
|
req.consumerGroup = strdup(tmq->groupId);
|
||||||
|
req.topicNames = tmq->clientTopics;
|
||||||
|
|
||||||
|
int tlen = tSerializeSCMSubscribeReq(NULL, &req);
|
||||||
|
void* buf = malloc(tlen);
|
||||||
|
if(buf == NULL) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* abuf = buf;
|
||||||
|
tSerializeSCMSubscribeReq(&abuf, &req);
|
||||||
|
/*printf("formatted: %s\n", dagStr);*/
|
||||||
|
|
||||||
|
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT);
|
||||||
|
if (pRequest == NULL) {
|
||||||
|
tscError("failed to malloc sqlObj");
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||||
|
pRequest->type = TDMT_MND_CREATE_TOPIC;
|
||||||
|
|
||||||
|
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||||
|
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
|
||||||
|
|
||||||
|
tsem_wait(&pRequest->body.rspSem);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
if (body != NULL) {
|
||||||
|
destroySendMsgInfo(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
pRequest->code = terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pRequest;
|
||||||
|
}
|
||||||
|
|
||||||
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
|
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
|
||||||
conf->commit_cb = cb;
|
conf->commit_cb = cb;
|
||||||
}
|
}
|
||||||
|
@ -327,10 +445,10 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
|
||||||
int sz = taosArrayGetSize(clientTopics);
|
int sz = taosArrayGetSize(clientTopics);
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
|
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
|
||||||
if (pCTopic->vgId == -1) {
|
/*if (pCTopic->vgId == -1) {*/
|
||||||
pMqHb->status = 1;
|
/*pMqHb->status = 1;*/
|
||||||
break;
|
/*break;*/
|
||||||
}
|
/*}*/
|
||||||
}
|
}
|
||||||
kv.value = pMqHb;
|
kv.value = pMqHb;
|
||||||
kv.valueLen = sizeof(SMqHbMsg);
|
kv.valueLen = sizeof(SMqHbMsg);
|
||||||
|
@ -451,22 +569,63 @@ _return:
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct tmq_message_t {
|
/*typedef SMqConsumeRsp tmq_message_t;*/
|
||||||
int32_t numOfRows;
|
|
||||||
char* topicName;
|
|
||||||
TAOS_ROW row[];
|
|
||||||
} tmq_message_t;
|
|
||||||
|
|
||||||
tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
|
struct tmq_message_t {
|
||||||
|
SMqConsumeRsp rsp;
|
||||||
|
};
|
||||||
|
|
||||||
|
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
|
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SRequestObj *pRequest = NULL;
|
||||||
|
SMqConsumeReq req = {0};
|
||||||
|
req.reqType = 1;
|
||||||
|
req.blockingTime = blocking_time;
|
||||||
|
req.consumerId = tmq->consumerId;
|
||||||
|
strcpy(req.cgroup, tmq->groupId);
|
||||||
|
|
||||||
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
||||||
|
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
||||||
|
strcpy(req.topic, pTopic->topicName);
|
||||||
|
int32_t nextVgIdx = pTopic->nextVgIdx;
|
||||||
|
pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
||||||
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
|
||||||
|
req.offset = pVg->currentOffset;
|
||||||
|
|
||||||
|
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
|
||||||
|
pRequest->type = TDMT_VND_CONSUME;
|
||||||
|
|
||||||
|
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body);
|
||||||
|
|
||||||
|
tsem_wait(&pRequest->body.rspSem);
|
||||||
|
|
||||||
|
return (tmq_message_t*)pRequest->body.resInfo.pData;
|
||||||
|
|
||||||
|
/*tsem_wait(&pRequest->body.rspSem);*/
|
||||||
|
|
||||||
|
/*if (body != NULL) {*/
|
||||||
|
/*destroySendMsgInfo(body);*/
|
||||||
|
/*}*/
|
||||||
|
|
||||||
|
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
|
||||||
|
/*pRequest->code = terrno;*/
|
||||||
|
/*}*/
|
||||||
|
|
||||||
|
/*return pRequest;*/
|
||||||
|
}
|
||||||
|
|
||||||
|
tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
|
||||||
|
SMqConsumeReq req = {0};
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
|
void tmq_message_destroy(tmq_message_t* tmq_message) {
|
||||||
return NULL;
|
if (tmq_message == NULL) return;
|
||||||
}
|
|
||||||
|
|
||||||
void tmq_message_destroy(tmq_message_t* mq_message) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||||
|
|
||||||
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
|
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
|
||||||
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/
|
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);
|
||||||
|
|
||||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||||
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
|
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
|
||||||
|
|
|
@ -526,29 +526,54 @@ TEST(testCase, show_table_Test) {
|
||||||
// taosHashCleanup(phash);
|
// taosHashCleanup(phash);
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//TEST(testCase, create_topic_Test) {
|
TEST(testCase, create_topic_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
// if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
// }
|
}
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
//
|
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
// ASSERT_TRUE(pFields == nullptr);
|
ASSERT_TRUE(pFields == nullptr);
|
||||||
//
|
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
// ASSERT_EQ(numOfFields, 0);
|
ASSERT_EQ(numOfFields, 0);
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
//
|
|
||||||
// char* sql = "select * from tu";
|
char* sql = "select * from tu";
|
||||||
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
// taos_close(pConn);
|
taos_close(pConn);
|
||||||
//}
|
}
|
||||||
|
|
||||||
|
TEST(testCase, tmq_subscribe_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
tmq_conf_set(conf, "group.id", "tg1");
|
||||||
|
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
|
||||||
|
|
||||||
|
tmq_list_t* topic_list = tmq_list_new();
|
||||||
|
tmq_list_append(topic_list, "test_topic_1");
|
||||||
|
tmq_subscribe(tmq, topic_list);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
|
||||||
|
printf("get msg\n");
|
||||||
|
if (msg == NULL) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, tmq_consume_Test) {
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, tmq_commit_TEST) {
|
||||||
|
}
|
||||||
|
|
||||||
//TEST(testCase, insert_test) {
|
//TEST(testCase, insert_test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
|
|
@ -19,14 +19,14 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
|
#include "scheduler.h"
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "tmsg.h"
|
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "tmsg.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "scheduler.h"
|
|
||||||
|
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
|
|
||||||
|
@ -37,12 +37,42 @@ extern "C" {
|
||||||
extern int32_t mDebugFlag;
|
extern int32_t mDebugFlag;
|
||||||
|
|
||||||
// mnode log function
|
// mnode log function
|
||||||
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }}
|
#define mFatal(...) \
|
||||||
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }}
|
{ \
|
||||||
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }}
|
if (mDebugFlag & DEBUG_FATAL) { \
|
||||||
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", 255, __VA_ARGS__); }}
|
taosPrintLog("MND FATAL ", 255, __VA_ARGS__); \
|
||||||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
|
} \
|
||||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
|
}
|
||||||
|
#define mError(...) \
|
||||||
|
{ \
|
||||||
|
if (mDebugFlag & DEBUG_ERROR) { \
|
||||||
|
taosPrintLog("MND ERROR ", 255, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
#define mWarn(...) \
|
||||||
|
{ \
|
||||||
|
if (mDebugFlag & DEBUG_WARN) { \
|
||||||
|
taosPrintLog("MND WARN ", 255, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
#define mInfo(...) \
|
||||||
|
{ \
|
||||||
|
if (mDebugFlag & DEBUG_INFO) { \
|
||||||
|
taosPrintLog("MND ", 255, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
#define mDebug(...) \
|
||||||
|
{ \
|
||||||
|
if (mDebugFlag & DEBUG_DEBUG) { \
|
||||||
|
taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
#define mTrace(...) \
|
||||||
|
{ \
|
||||||
|
if (mDebugFlag & DEBUG_TRACE) { \
|
||||||
|
taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
MND_AUTH_ACCT_START = 0,
|
MND_AUTH_ACCT_START = 0,
|
||||||
|
@ -328,6 +358,7 @@ typedef struct SMqTopicConsumer {
|
||||||
|
|
||||||
typedef struct SMqConsumerEp {
|
typedef struct SMqConsumerEp {
|
||||||
int32_t vgId; // -1 for unassigned
|
int32_t vgId; // -1 for unassigned
|
||||||
|
int32_t status;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
int64_t consumerId; // -1 for unassigned
|
int64_t consumerId; // -1 for unassigned
|
||||||
int64_t lastConsumerHbTs;
|
int64_t lastConsumerHbTs;
|
||||||
|
@ -339,6 +370,7 @@ typedef struct SMqConsumerEp {
|
||||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
|
||||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||||
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||||
|
@ -347,6 +379,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
|
||||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||||
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||||
|
@ -362,7 +395,8 @@ typedef struct SMqSubscribeObj {
|
||||||
int32_t nextConsumerIdx;
|
int32_t nextConsumerIdx;
|
||||||
SArray* availConsumer; // SArray<int64_t> (consumerId)
|
SArray* availConsumer; // SArray<int64_t> (consumerId)
|
||||||
SArray* assigned; // SArray<SMqConsumerEp>
|
SArray* assigned; // SArray<SMqConsumerEp>
|
||||||
SArray* unassignedConsumer; // SArray<SMqConsumerEp>
|
SArray* idleConsumer; // SArray<SMqConsumerEp>
|
||||||
|
SArray* lostConsumer; // SArray<SMqConsumerEp>
|
||||||
SArray* unassignedVg; // SArray<SMqConsumerEp>
|
SArray* unassignedVg; // SArray<SMqConsumerEp>
|
||||||
} SMqSubscribeObj;
|
} SMqSubscribeObj;
|
||||||
|
|
||||||
|
@ -384,17 +418,17 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
|
||||||
free(pSub);
|
free(pSub);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pSub->unassignedConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
|
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||||
if (pSub->assigned == NULL) {
|
if (pSub->assigned == NULL) {
|
||||||
taosArrayDestroy(pSub->availConsumer);
|
taosArrayDestroy(pSub->availConsumer);
|
||||||
taosArrayDestroy(pSub->unassignedConsumer);
|
taosArrayDestroy(pSub->idleConsumer);
|
||||||
free(pSub);
|
free(pSub);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
|
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||||
if (pSub->assigned == NULL) {
|
if (pSub->assigned == NULL) {
|
||||||
taosArrayDestroy(pSub->availConsumer);
|
taosArrayDestroy(pSub->availConsumer);
|
||||||
taosArrayDestroy(pSub->unassignedConsumer);
|
taosArrayDestroy(pSub->idleConsumer);
|
||||||
taosArrayDestroy(pSub->unassignedVg);
|
taosArrayDestroy(pSub->unassignedVg);
|
||||||
free(pSub);
|
free(pSub);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -422,10 +456,10 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
|
||||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
sz = taosArrayGetSize(pSub->unassignedConsumer);
|
sz = taosArrayGetSize(pSub->idleConsumer);
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedConsumer, i);
|
SMqConsumerEp* pCEp = taosArrayGet(pSub->idleConsumer, i);
|
||||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,22 +491,22 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
pSub->unassignedConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||||
if (pSub->unassignedConsumer == NULL) {
|
if (pSub->idleConsumer == NULL) {
|
||||||
taosArrayDestroy(pSub->assigned);
|
taosArrayDestroy(pSub->assigned);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp cEp;
|
SMqConsumerEp cEp;
|
||||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||||
taosArrayPush(pSub->unassignedConsumer, &cEp);
|
taosArrayPush(pSub->idleConsumer, &cEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||||
if (pSub->unassignedVg == NULL) {
|
if (pSub->unassignedVg == NULL) {
|
||||||
taosArrayDestroy(pSub->assigned);
|
taosArrayDestroy(pSub->assigned);
|
||||||
taosArrayDestroy(pSub->unassignedConsumer);
|
taosArrayDestroy(pSub->idleConsumer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
@ -512,13 +546,12 @@ typedef struct SMqTopicObj {
|
||||||
typedef struct SMqConsumerTopic {
|
typedef struct SMqConsumerTopic {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
//TODO: replace with something with ep
|
|
||||||
//SList *vgroups; // SList<int32_t>
|
|
||||||
// vg assigned to the consumer on the topic
|
// vg assigned to the consumer on the topic
|
||||||
SArray* pVgInfo; // SArray<int32_t>
|
SArray* pVgInfo; // SArray<int32_t>
|
||||||
} SMqConsumerTopic;
|
} SMqConsumerTopic;
|
||||||
|
|
||||||
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic,
|
||||||
|
SMqSubscribeObj* pSub) {
|
||||||
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
|
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
|
||||||
if (pCTopic == NULL) {
|
if (pCTopic == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -567,6 +600,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* p
|
||||||
|
|
||||||
typedef struct SMqConsumerObj {
|
typedef struct SMqConsumerObj {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
|
int64_t connId;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
SArray* topics; // SArray<SMqConsumerTopic>
|
SArray* topics; // SArray<SMqConsumerTopic>
|
||||||
|
|
|
@ -98,7 +98,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
// build msg
|
// build msg
|
||||||
SMqSetCVgReq req = {
|
SMqSetCVgReq req = {
|
||||||
.vgId = pCEp->vgId,
|
.vgId = pCEp->vgId,
|
||||||
.consumerId = consumerId,
|
.oldConsumerId = -1,
|
||||||
|
.newConsumerId = consumerId,
|
||||||
};
|
};
|
||||||
strcpy(req.cgroup, cgroup);
|
strcpy(req.cgroup, cgroup);
|
||||||
strcpy(req.topicName, topic);
|
strcpy(req.topicName, topic);
|
||||||
|
@ -152,6 +153,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
||||||
//convert dag to msg
|
//convert dag to msg
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp CEp;
|
SMqConsumerEp CEp;
|
||||||
|
CEp.status = 0;
|
||||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||||
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
||||||
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
||||||
|
@ -171,7 +173,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
SMqSetCVgReq req = {
|
SMqSetCVgReq req = {
|
||||||
.vgId = vgId,
|
.vgId = vgId,
|
||||||
.consumerId = pConsumer->consumerId,
|
.oldConsumerId = -1,
|
||||||
|
.newConsumerId = pConsumer->consumerId,
|
||||||
};
|
};
|
||||||
strcpy(req.cgroup, pConsumer->cgroup);
|
strcpy(req.cgroup, pConsumer->cgroup);
|
||||||
strcpy(req.topicName, pTopic->name);
|
strcpy(req.topicName, pTopic->name);
|
||||||
|
@ -451,12 +454,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
|
||||||
if (pTopic == NULL) {
|
if (pTopic == NULL) {
|
||||||
/*terrno = */
|
mError("topic being subscribed not exist: %s", newTopicName);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
|
mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
|
||||||
pSub = tNewSubscribeObj();
|
pSub = tNewSubscribeObj();
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -464,14 +468,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
// set unassigned vg
|
// set unassigned vg
|
||||||
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
|
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
|
||||||
|
//TODO: disable alter
|
||||||
}
|
}
|
||||||
taosArrayPush(pSub->availConsumer, &consumerId);
|
taosArrayPush(pSub->availConsumer, &consumerId);
|
||||||
|
|
||||||
// TODO: no need
|
|
||||||
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
||||||
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
||||||
|
|
||||||
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
|
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
|
||||||
|
ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1);
|
||||||
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
|
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
|
||||||
// send setmsg to vnode
|
// send setmsg to vnode
|
||||||
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
|
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
|
||||||
|
@ -479,8 +484,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pConsumerTopic->pVgInfo);
|
|
||||||
free(pConsumerTopic);
|
|
||||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||||
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
|
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
|
||||||
mndTransAppendRedolog(pTrans, pRaw);
|
mndTransAppendRedolog(pTrans, pRaw);
|
||||||
|
@ -533,12 +537,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
if (newSub) taosArrayDestroy(newSub);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: free memory
|
|
||||||
if (newSub) taosArrayDestroy(newSub);
|
if (newSub) taosArrayDestroy(newSub);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "vnode.h"
|
|
||||||
#include "mallocator.h"
|
#include "mallocator.h"
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
@ -29,6 +28,7 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "vnode.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -153,6 +153,7 @@ typedef struct STqTaskItem {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
void* dst;
|
void* dst;
|
||||||
qTaskInfo_t task;
|
qTaskInfo_t task;
|
||||||
|
SSubQueryMsg* pQueryMsg;
|
||||||
} STqTaskItem;
|
} STqTaskItem;
|
||||||
|
|
||||||
// new version
|
// new version
|
||||||
|
@ -164,7 +165,6 @@ typedef struct STqBuffer {
|
||||||
|
|
||||||
typedef struct STqTopicHandle {
|
typedef struct STqTopicHandle {
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char* sql;
|
char* sql;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
|
@ -177,6 +177,7 @@ typedef struct STqTopicHandle {
|
||||||
typedef struct STqConsumerHandle {
|
typedef struct STqConsumerHandle {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int64_t epoch;
|
int64_t epoch;
|
||||||
|
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||||
SArray* topics; // SArray<STqClientTopic>
|
SArray* topics; // SArray<STqClientTopic>
|
||||||
} STqConsumerHandle;
|
} STqConsumerHandle;
|
||||||
|
|
||||||
|
@ -318,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq);
|
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,8 +44,10 @@ extern int32_t tqDebugFlag;
|
||||||
// delete persistent storage for meta info
|
// delete persistent storage for meta info
|
||||||
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
||||||
|
|
||||||
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
|
//int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
|
||||||
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
|
//const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
|
||||||
|
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
|
||||||
|
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
|
||||||
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
|
||||||
// TODO: error code of buffer pool
|
// TODO: error code of buffer pool
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
|
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
|
||||||
if (pTq->tqMeta == NULL) {
|
if (pTq->tqMeta == NULL) {
|
||||||
free(pTq);
|
free(pTq);
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -478,6 +478,59 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
|
||||||
|
int32_t num = taosArrayGetSize(pConsumer->topics);
|
||||||
|
int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
|
||||||
|
if (sz > (*ppHead)->ssize) {
|
||||||
|
void* tmpPtr = realloc(*ppHead, sz);
|
||||||
|
if (tmpPtr == NULL) {
|
||||||
|
free(*ppHead);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*ppHead = tmpPtr;
|
||||||
|
(*ppHead)->ssize = sz;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* ptr = (*ppHead)->content;
|
||||||
|
*(int64_t*)ptr = pConsumer->consumerId;
|
||||||
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
|
*(int64_t*)ptr = pConsumer->epoch;
|
||||||
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
|
memcpy(ptr, pConsumer->topics, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
*(int32_t*)ptr = num;
|
||||||
|
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
||||||
|
for (int32_t i = 0; i < num; i++) {
|
||||||
|
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
|
||||||
|
memcpy(ptr, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
*(int64_t*)ptr = pTopic->committedOffset;
|
||||||
|
POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) {
|
||||||
|
STqConsumerHandle* pConsumer = *ppConsumer;
|
||||||
|
const void* ptr = pHead->content;
|
||||||
|
pConsumer->consumerId = *(int64_t*)ptr;
|
||||||
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
|
pConsumer->epoch = *(int64_t*)ptr;
|
||||||
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
|
memcpy(pConsumer->cgroup, ptr, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
int32_t sz = *(int32_t*)ptr;
|
||||||
|
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
||||||
|
pConsumer->topics = taosArrayInit(sz, sizeof(STqTopicHandle));
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
/*STqTopicHandle* topicHandle = */
|
||||||
|
/*taosArrayPush(pConsumer->topics, );*/
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
|
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
|
||||||
// calculate size
|
// calculate size
|
||||||
int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
|
int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
|
||||||
|
@ -608,6 +661,7 @@ int tqItemSSize() {
|
||||||
// mainly for executor
|
// mainly for executor
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
SMqConsumeReq* pReq = pMsg->pCont;
|
SMqConsumeReq* pReq = pMsg->pCont;
|
||||||
|
@ -625,7 +679,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
|
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
|
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
|
||||||
|
//TODO: support multiple topic in one req
|
||||||
|
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fetchOffset == -1) {
|
||||||
|
fetchOffset = pTopic->committedOffset + 1;
|
||||||
|
}
|
||||||
int8_t pos;
|
int8_t pos;
|
||||||
int8_t skip = 0;
|
int8_t skip = 0;
|
||||||
SWalHead* pHead;
|
SWalHead* pHead;
|
||||||
|
@ -670,6 +731,23 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (pDataBlock != NULL) {
|
if (pDataBlock != NULL) {
|
||||||
|
SMqTbData tbData = {
|
||||||
|
.uid = pDataBlock->info.uid,
|
||||||
|
.numOfCols = pDataBlock->info.numOfCols,
|
||||||
|
.numOfRows = pDataBlock->info.rows,
|
||||||
|
};
|
||||||
|
for (int i = 0; i < pDataBlock->info.numOfCols; i++) {
|
||||||
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
|
int32_t sz = pColData->info.bytes * pDataBlock->info.rows;
|
||||||
|
SMqColData colData = {
|
||||||
|
.bytes = pColData->info.bytes,
|
||||||
|
.colId = pColData->info.colId,
|
||||||
|
.type = pColData->info.type,
|
||||||
|
};
|
||||||
|
memcpy(colData.data, pColData->pData, colData.bytes * pDataBlock->info.rows);
|
||||||
|
memcpy(&tbData.colData[i], &colData, sz);
|
||||||
|
}
|
||||||
|
/*pDataBlock->info.*/
|
||||||
taosArrayPush(pRes, pDataBlock);
|
taosArrayPush(pRes, pDataBlock);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -692,29 +770,34 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
pTopic->buffer.lastOffset = pReq->offset;
|
pTopic->buffer.lastOffset = pReq->offset;
|
||||||
}
|
}
|
||||||
// put output into rsp
|
// put output into rsp
|
||||||
|
SMqConsumeRsp rsp = {
|
||||||
|
.consumerId = consumerId,
|
||||||
|
.numOfTopics = 1
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// launch query
|
|
||||||
// get result
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
|
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
||||||
|
SMqSetCVgReq req;
|
||||||
|
tDecodeSMqSetCVgReq(msg, &req);
|
||||||
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
strcpy(pConsumer->cgroup, req.cgroup);
|
||||||
|
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
|
||||||
|
|
||||||
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
|
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
|
||||||
if (pTopic == NULL) {
|
if (pTopic == NULL) {
|
||||||
free(pConsumer);
|
free(pConsumer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
strcpy(pTopic->topicName, pReq->topicName);
|
strcpy(pTopic->topicName, req.topicName);
|
||||||
strcpy(pTopic->cgroup, pReq->cgroup);
|
strcpy(pTopic->sql, req.sql);
|
||||||
strcpy(pTopic->sql, pReq->sql);
|
strcpy(pTopic->logicalPlan, req.logicalPlan);
|
||||||
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
|
strcpy(pTopic->physicalPlan, req.physicalPlan);
|
||||||
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
|
|
||||||
|
|
||||||
pTopic->buffer.firstOffset = -1;
|
pTopic->buffer.firstOffset = -1;
|
||||||
pTopic->buffer.lastOffset = -1;
|
pTopic->buffer.lastOffset = -1;
|
||||||
|
@ -724,9 +807,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
pTopic->buffer.output[i].status = 0;
|
pTopic->buffer.output[i].status = 0;
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, pReadHandle);
|
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle);
|
||||||
}
|
}
|
||||||
// write mq meta
|
taosArrayPush(pConsumer->topics, pTopic);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -112,9 +112,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
case TDMT_VND_MQ_SET_CONN: {
|
||||||
SMqSetCVgReq req;
|
if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) {
|
||||||
tDecodeSMqSetCVgReq(ptr, &req);
|
|
||||||
if (tqProcessSetConnReq(pVnode->pTq, &req) < 0) {
|
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
|
|
Loading…
Reference in New Issue