Merge branch '3.0' into test/jcy
This commit is contained in:
commit
1a621182cc
|
@ -234,7 +234,7 @@ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errst
|
||||||
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
||||||
|
|
||||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
|
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
|
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
|
||||||
|
|
|
@ -1333,7 +1333,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
SArray* topicNames; // SArray<char*>
|
SArray* topicNames; // SArray<char**>
|
||||||
} SCMSubscribeReq;
|
} SCMSubscribeReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||||
|
|
|
@ -145,7 +145,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-mq-ask-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||||
|
|
|
@ -41,10 +41,6 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *han
|
||||||
|
|
||||||
void taosTmrCleanUp(void *handle);
|
void taosTmrCleanUp(void *handle);
|
||||||
|
|
||||||
int32_t taosInitTimer(void (*callback)(int32_t), int32_t ms);
|
|
||||||
|
|
||||||
void taosUninitTimer();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -23,6 +23,9 @@
|
||||||
#include "tmsgtype.h"
|
#include "tmsgtype.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
int32_t tmqAskEp(tmq_t* tmq, bool sync);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t tmqRspType;
|
int8_t tmqRspType;
|
||||||
|
@ -61,29 +64,40 @@ struct tmq_conf_t {
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commit_cb;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t inited;
|
||||||
|
tmr_h timer;
|
||||||
|
} SMqMgmt;
|
||||||
|
|
||||||
|
static SMqMgmt tmqMgmt = {0};
|
||||||
|
|
||||||
struct tmq_t {
|
struct tmq_t {
|
||||||
// conf
|
// conf
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
char clientId[256];
|
char clientId[256];
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
/*int8_t inWaiting;*/
|
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int32_t epoch;
|
|
||||||
int32_t resetOffsetCfg;
|
int32_t resetOffsetCfg;
|
||||||
int64_t status;
|
|
||||||
STscObj* pTscObj;
|
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commit_cb;
|
||||||
/*int32_t nextTopicIdx;*/
|
|
||||||
|
// status
|
||||||
|
int8_t status;
|
||||||
int8_t epStatus;
|
int8_t epStatus;
|
||||||
|
int32_t epoch;
|
||||||
int32_t epSkipCnt;
|
int32_t epSkipCnt;
|
||||||
/*int32_t waitingRequest;*/
|
|
||||||
/*int32_t readyRequest;*/
|
|
||||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
|
||||||
STaosQueue* mqueue; // queue of tmq_message_t
|
|
||||||
STaosQall* qall;
|
|
||||||
tsem_t rspSem;
|
|
||||||
// stat
|
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
|
|
||||||
|
// connection
|
||||||
|
STscObj* pTscObj;
|
||||||
|
|
||||||
|
// container
|
||||||
|
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||||
|
STaosQueue* mqueue; // queue of rsp
|
||||||
|
STaosQall* qall;
|
||||||
|
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
|
||||||
|
|
||||||
|
// ctl
|
||||||
|
tsem_t rspSem;
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -93,6 +107,7 @@ enum {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TMQ_CONSUMER_STATUS__INIT = 0,
|
TMQ_CONSUMER_STATUS__INIT = 0,
|
||||||
|
TMQ_CONSUMER_STATUS__SUBSCRIBED,
|
||||||
TMQ_CONSUMER_STATUS__READY,
|
TMQ_CONSUMER_STATUS__READY,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -110,13 +125,11 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// subscribe info
|
// subscribe info
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
char* topicName;
|
char* topicName;
|
||||||
int64_t topicId;
|
|
||||||
SArray* vgs; // SArray<SMqClientVg>
|
SArray* vgs; // SArray<SMqClientVg>
|
||||||
|
|
||||||
int8_t isSchemaAdaptive;
|
int8_t isSchemaAdaptive;
|
||||||
int32_t numOfFields;
|
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} SMqClientTopic;
|
} SMqClientTopic;
|
||||||
|
|
||||||
|
@ -156,7 +169,6 @@ typedef struct {
|
||||||
int32_t async;
|
int32_t async;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
tmq_resp_err_t rspErr;
|
tmq_resp_err_t rspErr;
|
||||||
/*SMqClientVg* pVg;*/
|
|
||||||
} SMqCommitCbParam;
|
} SMqCommitCbParam;
|
||||||
|
|
||||||
tmq_conf_t* tmq_conf_new() {
|
tmq_conf_t* tmq_conf_new() {
|
||||||
|
@ -251,13 +263,7 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
|
||||||
|
|
||||||
void tmq_list_destroy(tmq_list_t* list) {
|
void tmq_list_destroy(tmq_list_t* list) {
|
||||||
SArray* container = &list->container;
|
SArray* container = &list->container;
|
||||||
/*taosArrayDestroy(container);*/
|
taosArrayDestroyP(container, taosMemoryFree);
|
||||||
int32_t sz = taosArrayGetSize(container);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
char* str = taosArrayGetP(container, i);
|
|
||||||
taosMemoryFree(str);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(container);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_list_get_size(const tmq_list_t* list) {
|
int32_t tmq_list_get_size(const tmq_list_t* list) {
|
||||||
|
@ -298,6 +304,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||||
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||||
pParam->rspErr = code;
|
pParam->rspErr = code;
|
||||||
|
tmq_t* tmq = pParam->tmq;
|
||||||
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__SUBSCRIBED);
|
||||||
tsem_post(&pParam->rspSem);
|
tsem_post(&pParam->rspSem);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -335,12 +343,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pTmq->pTscObj = (STscObj*)conn;
|
pTmq->pTscObj = (STscObj*)conn;
|
||||||
/*pTmq->inWaiting = 0;*/
|
|
||||||
pTmq->status = 0;
|
pTmq->status = 0;
|
||||||
pTmq->pollCnt = 0;
|
pTmq->pollCnt = 0;
|
||||||
pTmq->epoch = 0;
|
pTmq->epoch = 0;
|
||||||
/*pTmq->waitingRequest = 0;*/
|
|
||||||
/*pTmq->readyRequest = 0;*/
|
|
||||||
pTmq->epStatus = 0;
|
pTmq->epStatus = 0;
|
||||||
pTmq->epSkipCnt = 0;
|
pTmq->epSkipCnt = 0;
|
||||||
// set conf
|
// set conf
|
||||||
|
@ -367,26 +372,45 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
// init timer
|
||||||
|
int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
|
||||||
|
if (inited == 0) {
|
||||||
|
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
|
||||||
|
if (tmqMgmt.timer == NULL) {
|
||||||
|
atomic_store_8(&tmqMgmt.inited, 0);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
||||||
if (pTmq == NULL) {
|
if (pTmq == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
|
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
|
||||||
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
|
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
|
||||||
|
|
||||||
ASSERT(user);
|
ASSERT(user);
|
||||||
ASSERT(pass);
|
ASSERT(pass);
|
||||||
/*ASSERT(conf->db);*/
|
|
||||||
ASSERT(conf->groupId[0]);
|
ASSERT(conf->groupId[0]);
|
||||||
|
|
||||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
|
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||||
if (pTmq->pTscObj == NULL) return NULL;
|
pTmq->mqueue = taosOpenQueue();
|
||||||
|
pTmq->qall = taosAllocateQall();
|
||||||
|
pTmq->delayedTask = taosOpenQueue();
|
||||||
|
|
||||||
pTmq->status = 0;
|
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// init status
|
||||||
|
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
||||||
pTmq->pollCnt = 0;
|
pTmq->pollCnt = 0;
|
||||||
pTmq->epoch = 0;
|
pTmq->epoch = 0;
|
||||||
pTmq->epStatus = 0;
|
pTmq->epStatus = 0;
|
||||||
pTmq->epSkipCnt = 0;
|
pTmq->epSkipCnt = 0;
|
||||||
|
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
|
@ -394,19 +418,30 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->commit_cb = conf->commit_cb;
|
pTmq->commit_cb = conf->commit_cb;
|
||||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||||
|
|
||||||
|
// assign consumerId
|
||||||
pTmq->consumerId = tGenIdPI64();
|
pTmq->consumerId = tGenIdPI64();
|
||||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
|
||||||
if (pTmq->clientTopics == NULL) {
|
// init semaphore
|
||||||
taosMemoryFree(pTmq);
|
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
|
||||||
return NULL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTmq->mqueue = taosOpenQueue();
|
// init connection
|
||||||
pTmq->qall = taosAllocateQall();
|
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
||||||
|
if (pTmq->pTscObj == NULL) {
|
||||||
tsem_init(&pTmq->rspSem, 0, 0);
|
tsem_destroy(&pTmq->rspSem);
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
return pTmq;
|
return pTmq;
|
||||||
|
|
||||||
|
FAIL:
|
||||||
|
if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
|
||||||
|
if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
|
||||||
|
if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
|
||||||
|
if (pTmq->qall) taosFreeQall(pTmq->qall);
|
||||||
|
taosMemoryFree(pTmq);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
||||||
|
@ -497,86 +532,64 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
SRequestObj* pRequest = NULL;
|
const SArray* container = &topic_list->container;
|
||||||
SArray* container = &topic_list->container;
|
|
||||||
int32_t sz = taosArrayGetSize(container);
|
int32_t sz = taosArrayGetSize(container);
|
||||||
// destroy ex
|
void* buf = NULL;
|
||||||
taosArrayDestroy(tmq->clientTopics);
|
SCMSubscribeReq req = {0};
|
||||||
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
int32_t code = -1;
|
||||||
|
|
||||||
SCMSubscribeReq req;
|
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
strcpy(req.cgroup, tmq->groupId);
|
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
||||||
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
||||||
|
if (req.topicNames == NULL) goto FAIL;
|
||||||
|
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
/*char* topicName = topic_list->elems[i];*/
|
char* topic = taosArrayGetP(container, i);
|
||||||
char* topicName = taosArrayGetP(container, i);
|
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
#if 0
|
tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
|
||||||
char* dbName = getDbOfConnection(tmq->pTscObj);
|
|
||||||
if (dbName == NULL) {
|
|
||||||
return TMQ_RESP_ERR__FAIL;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
tNameSetDbName(&name, tmq->pTscObj->acctId, topicName, strlen(topicName));
|
|
||||||
#if 0
|
|
||||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
||||||
if (topicFname == NULL) {
|
if (topicFName == NULL) {
|
||||||
goto _return;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
tNameExtractFullName(&name, topicFname);
|
tNameExtractFullName(&name, topicFName);
|
||||||
tscDebug("subscribe topic: %s", topicFname);
|
|
||||||
SMqClientTopic topic = {
|
tscDebug("subscribe topic: %s", topicFName);
|
||||||
.sql = NULL,
|
|
||||||
.sqlLen = 0,
|
taosArrayPush(req.topicNames, &topicFName);
|
||||||
.topicId = 0,
|
|
||||||
.topicName = topicFname,
|
|
||||||
.vgs = NULL,
|
|
||||||
};
|
|
||||||
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
|
||||||
taosArrayPush(tmq->clientTopics, &topic);
|
|
||||||
taosArrayPush(req.topicNames, &topicFname);
|
|
||||||
#if 0
|
|
||||||
taosMemoryFree(dbName);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int tlen = tSerializeSCMSubscribeReq(NULL, &req);
|
int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
buf = taosMemoryMalloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) goto FAIL;
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* abuf = buf;
|
void* abuf = buf;
|
||||||
tSerializeSCMSubscribeReq(&abuf, &req);
|
tSerializeSCMSubscribeReq(&abuf, &req);
|
||||||
/*printf("formatted: %s\n", dagStr);*/
|
|
||||||
|
|
||||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
|
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
||||||
if (pRequest == NULL) {
|
if (sendInfo == NULL) goto FAIL;
|
||||||
tscError("failed to malloc request");
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqSubscribeCbParam param = {
|
SMqSubscribeCbParam param = {
|
||||||
.rspErr = TMQ_RESP_ERR__SUCCESS,
|
.rspErr = TMQ_RESP_ERR__SUCCESS,
|
||||||
.tmq = tmq,
|
.tmq = tmq,
|
||||||
};
|
};
|
||||||
tsem_init(¶m.rspSem, 0, 0);
|
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
if (tsem_init(¶m.rspSem, 0, 0) != 0) goto FAIL;
|
||||||
|
|
||||||
|
sendInfo->msgInfo = (SDataBuf){
|
||||||
.pData = buf,
|
.pData = buf,
|
||||||
.len = tlen,
|
.len = tlen,
|
||||||
.handle = NULL,
|
.handle = NULL,
|
||||||
};
|
};
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
sendInfo->requestId = generateRequestId();
|
||||||
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = ¶m;
|
sendInfo->param = ¶m;
|
||||||
sendInfo->fp = tmqSubscribeCb;
|
sendInfo->fp = tmqSubscribeCb;
|
||||||
|
sendInfo->msgType = TDMT_MND_SUBSCRIBE;
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
|
@ -585,15 +598,28 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
tsem_wait(¶m.rspSem);
|
tsem_wait(¶m.rspSem);
|
||||||
tsem_destroy(¶m.rspSem);
|
tsem_destroy(¶m.rspSem);
|
||||||
|
|
||||||
_return:
|
code = param.rspErr;
|
||||||
/*if (sendInfo != NULL) {*/
|
if (code != 0) goto FAIL;
|
||||||
/*destroySendMsgInfo(sendInfo);*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
return param.rspErr;
|
// TODO: add max retry cnt
|
||||||
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, true)) {
|
||||||
|
tscDebug("not ready, retry\n");
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
FAIL:
|
||||||
|
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
|
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
|
||||||
|
//
|
||||||
|
conf->commit_cb = cb;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
|
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
|
||||||
STscObj* pTscObj = (STscObj*)taos;
|
STscObj* pTscObj = (STscObj*)taos;
|
||||||
|
@ -627,9 +653,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||||
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
|
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
|
||||||
|
|
||||||
// todo check for invalid sql statement and return with error code
|
|
||||||
|
|
||||||
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
|
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
|
||||||
|
|
||||||
/*printf("%s\n", pStr);*/
|
/*printf("%s\n", pStr);*/
|
||||||
|
@ -653,7 +676,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSCMCreateStreamReq(buf, tlen, &req);
|
tSerializeSCMCreateStreamReq(buf, tlen, &req);
|
||||||
/*printf("formatted: %s\n", dagStr);*/
|
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
pRequest->body.requestMsg = (SDataBuf){
|
||||||
.pData = buf,
|
.pData = buf,
|
||||||
|
@ -684,94 +706,6 @@ _return:
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
|
|
||||||
STscObj* pTscObj = (STscObj*)taos;
|
|
||||||
SRequestObj* pRequest = NULL;
|
|
||||||
SQuery* pQueryNode = NULL;
|
|
||||||
char* astStr = NULL;
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
if (taos == NULL || topicName == NULL || sql == NULL) {
|
|
||||||
tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
|
|
||||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
|
|
||||||
tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
|
|
||||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
|
|
||||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
|
||||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tscDebug("start to create topic: %s", topicName);
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
|
||||||
CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
|
|
||||||
|
|
||||||
// todo check for invalid sql statement and return with error code
|
|
||||||
|
|
||||||
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
|
|
||||||
|
|
||||||
/*printf("%s\n", pStr);*/
|
|
||||||
|
|
||||||
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
|
|
||||||
strcpy(name.dbname, pRequest->pDb);
|
|
||||||
strcpy(name.tname, topicName);
|
|
||||||
|
|
||||||
SCMCreateTopicReq req = {
|
|
||||||
.igExists = 1,
|
|
||||||
.ast = astStr,
|
|
||||||
.sql = (char*)sql,
|
|
||||||
};
|
|
||||||
tNameExtractFullName(&name, req.name);
|
|
||||||
|
|
||||||
int tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
|
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSerializeSCMCreateTopicReq(buf, tlen, &req);
|
|
||||||
/*printf("formatted: %s\n", dagStr);*/
|
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
|
||||||
.pData = buf,
|
|
||||||
.len = tlen,
|
|
||||||
.handle = NULL,
|
|
||||||
};
|
|
||||||
pRequest->type = TDMT_MND_CREATE_TOPIC;
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
|
||||||
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
|
||||||
|
|
||||||
tsem_wait(&pRequest->body.rspSem);
|
|
||||||
|
|
||||||
_return:
|
|
||||||
taosMemoryFreeClear(astStr);
|
|
||||||
qDestroyQuery(pQueryNode);
|
|
||||||
/*if (sendInfo != NULL) {*/
|
|
||||||
/*destroySendMsgInfo(sendInfo);*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
pRequest->code = terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pRequest;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||||
if (tmq_message == NULL) return 0;
|
if (tmq_message == NULL) return 0;
|
||||||
|
@ -954,7 +888,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
||||||
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
||||||
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
||||||
atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||||
}
|
}
|
||||||
tDeleteSMqCMGetSubEpRsp(&rsp);
|
tDeleteSMqCMGetSubEpRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1189,7 +1123,6 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
/*printf("send poll\n");*/
|
/*printf("send poll\n");*/
|
||||||
/*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
|
|
||||||
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
|
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
|
||||||
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
|
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
|
||||||
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||||
|
@ -1268,12 +1201,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
SMqRspObj* rspObj;
|
SMqRspObj* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
// TODO: put into another thread or delayed queue
|
// TODO: put into delayed queue
|
||||||
int64_t status = atomic_load_64(&tmq->status);
|
#if 0
|
||||||
while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) {
|
int8_t status = atomic_load_8(&tmq->status);
|
||||||
|
while (0 != tmqAskEp(tmq, status != TMQ_CONSUMER_STATUS__READY)) {
|
||||||
tscDebug("not ready, retry\n");
|
tscDebug("not ready, retry\n");
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
|
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
|
||||||
if (rspObj) {
|
if (rspObj) {
|
||||||
|
@ -1281,7 +1216,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
/*printf("cycle\n");*/
|
|
||||||
tmqAskEp(tmq, false);
|
tmqAskEp(tmq, false);
|
||||||
tmqPollImpl(tmq, blocking_time);
|
tmqPollImpl(tmq, blocking_time);
|
||||||
|
|
||||||
|
|
|
@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = vnodeStart(pImpl);
|
||||||
|
if (code != 0) {
|
||||||
|
tFreeSCreateVnodeReq(&createReq);
|
||||||
|
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
|
||||||
|
vnodeClose(pImpl);
|
||||||
|
vnodeDestroy(path, pMgmt->pTfs);
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
code = vmWriteVnodesToFile(pMgmt);
|
code = vmWriteVnodesToFile(pMgmt);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tFreeSCreateVnodeReq(&createReq);
|
tFreeSCreateVnodeReq(&createReq);
|
||||||
|
|
|
@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync integration
|
|
||||||
vnodeSyncSetQ(pImpl, NULL);
|
|
||||||
vnodeSyncSetRpc(pImpl, NULL);
|
|
||||||
int32_t ret = vnodeSyncStart(pImpl);
|
|
||||||
assert(ret == 0);
|
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->latch);
|
taosWLockLatch(&pMgmt->latch);
|
||||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
} else {
|
} else {
|
||||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||||
|
//vnodeStart(pImpl);
|
||||||
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->opened++;
|
pThread->opened++;
|
||||||
}
|
}
|
||||||
|
@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vmStart(SMgmtWrapper *pWrapper) {
|
||||||
|
dDebug("vnode-mgmt start to run");
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
vnodeStart(pVnode->pImpl);
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vmStop(SMgmtWrapper *pWrapper) {
|
||||||
|
#if 0
|
||||||
|
dDebug("vnode-mgmt start to stop");
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
vnodeStop(pVnode->pImpl);
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtFp mgmtFp = {0};
|
SMgmtFp mgmtFp = {0};
|
||||||
mgmtFp.openFp = vmInit;
|
mgmtFp.openFp = vmInit;
|
||||||
mgmtFp.closeFp = vmCleanup;
|
mgmtFp.closeFp = vmCleanup;
|
||||||
|
mgmtFp.startFp = vmStart;
|
||||||
|
mgmtFp.stopFp = vmStop;
|
||||||
mgmtFp.requiredFp = vmRequire;
|
mgmtFp.requiredFp = vmRequire;
|
||||||
|
|
||||||
vmInitMsgHandle(pWrapper);
|
vmInitMsgHandle(pWrapper);
|
||||||
|
|
|
@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||||
|
|
||||||
|
int32_t vnodeStart(SVnode *pVnode);
|
||||||
|
void vnodeStop(SVnode *pVnode);
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
||||||
|
|
||||||
|
@ -171,11 +174,6 @@ typedef struct {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
||||||
// sync integration
|
|
||||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle);
|
|
||||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle);
|
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
#include "vnodeSync.h"
|
||||||
|
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
|
@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start the sync timer after the queue is ready
|
||||||
|
int32_t vnodeStart(SVnode *pVnode) {
|
||||||
|
vnodeSyncSetQ(pVnode, NULL);
|
||||||
|
vnodeSyncSetRpc(pVnode, NULL);
|
||||||
|
vnodeSyncStart(pVnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeStop(SVnode *pVnode) {}
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
||||||
|
|
||||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
|
@ -457,6 +457,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
||||||
|
|
||||||
void syncNodeStart(SSyncNode* pSyncNode) {
|
void syncNodeStart(SSyncNode* pSyncNode) {
|
||||||
// start raft
|
// start raft
|
||||||
|
if (pSyncNode->replicaNum == 1) {
|
||||||
|
syncNodeBecomeLeader(pSyncNode);
|
||||||
|
|
||||||
|
syncNodeLog2("==state change become leader immediately==", pSyncNode);
|
||||||
|
|
||||||
|
// Raft 3.6.2 Committing entries from previous terms
|
||||||
|
|
||||||
|
// use this now
|
||||||
|
syncNodeAppendNoop(pSyncNode);
|
||||||
|
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
syncNodeBecomeFollower(pSyncNode);
|
syncNodeBecomeFollower(pSyncNode);
|
||||||
|
|
||||||
// for test
|
// for test
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
|
|
@ -403,6 +403,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum"
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
||||||
|
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
||||||
|
|
|
@ -395,7 +395,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) {
|
||||||
void shellShowOnScreen(SShellCmd *cmd) {
|
void shellShowOnScreen(SShellCmd *cmd) {
|
||||||
struct winsize w;
|
struct winsize w;
|
||||||
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
||||||
fprintf(stderr, "No stream device\n");
|
// fprintf(stderr, "No stream device\n");
|
||||||
w.ws_col = 120;
|
w.ws_col = 120;
|
||||||
w.ws_row = 30;
|
w.ws_row = 30;
|
||||||
}
|
}
|
||||||
|
|
|
@ -750,7 +750,7 @@ void shellReadHistory() {
|
||||||
|
|
||||||
void shellWriteHistory() {
|
void shellWriteHistory() {
|
||||||
SShellHistory *pHistory = &shell.history;
|
SShellHistory *pHistory = &shell.history;
|
||||||
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_WRITE | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_STREAM | TD_FILE_APPEND);
|
||||||
if (pFile == NULL) return;
|
if (pFile == NULL) return;
|
||||||
|
|
||||||
for (int32_t i = pHistory->hstart; i != pHistory->hend;) {
|
for (int32_t i = pHistory->hstart; i != pHistory->hend;) {
|
||||||
|
|
Loading…
Reference in New Issue