Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/meta_3.0
This commit is contained in:
commit
203444f6bc
|
@ -87,6 +87,25 @@ void commitSync() throws SQLException;
|
|||
void close() throws SQLException;
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
func NewConsumer(conf *Config) (*Consumer, error)
|
||||
|
||||
func (c *Consumer) Close() error
|
||||
|
||||
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
|
||||
|
||||
func (c *Consumer) FreeMessage(message unsafe.Pointer)
|
||||
|
||||
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
|
||||
|
||||
func (c *Consumer) Subscribe(topics []string) error
|
||||
|
||||
func (c *Consumer) Unsubscribe() error
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -229,6 +248,56 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
config := tmq.NewConfig()
|
||||
defer config.Destroy()
|
||||
err = config.SetGroupID("test")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetAutoOffsetReset("earliest")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectIP("127.0.0.1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectUser("root")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectPass("taosdata")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectPort("6030")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetMsgWithTableName(true)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.EnableHeartBeat()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
|
||||
if result.ErrCode != 0 {
|
||||
errStr := wrapper.TMQErr2Str(result.ErrCode)
|
||||
err := errors.NewError(int(result.ErrCode), errStr)
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -260,6 +329,20 @@ topics.add("tmq_topic");
|
|||
consumer.subscribe(topics);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
consumer, err := tmq.NewConsumer(config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = consumer.Subscribe([]string{"example_tmq_topic"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
@ -293,6 +376,21 @@ while(running){
|
|||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
for {
|
||||
result, err := consumer.Poll(time.Second)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println(result)
|
||||
consumer.Commit(context.Background(), result.Message)
|
||||
consumer.FreeMessage(result.Message)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -322,6 +420,13 @@ consumer.unsubscribe();
|
|||
consumer.close();
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
consumer.Close()
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ extern bool gRaftDetailLog;
|
|||
#define SYNC_SPEED_UP_HB_TIMER 400
|
||||
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
|
||||
#define SYNC_SLOW_DOWN_RANGE 100
|
||||
#define SYNC_MAX_READ_RANGE 10
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
|
@ -210,9 +211,12 @@ void syncStop(int64_t rid);
|
|||
int32_t syncSetStandby(int64_t rid);
|
||||
ESyncState syncGetMyRole(int64_t rid);
|
||||
bool syncIsReady(int64_t rid);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
const char* syncGetMyRoleStr(int64_t rid);
|
||||
bool syncRestoreFinish(int64_t rid);
|
||||
SyncTerm syncGetMyTerm(int64_t rid);
|
||||
SyncIndex syncGetLastIndex(int64_t rid);
|
||||
SyncIndex syncGetCommitIndex(int64_t rid);
|
||||
SyncGroupId syncGetVgId(int64_t rid);
|
||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
|
|
|
@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
|
|||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
bool vnodeIsLeader(SVnode* pVnode);
|
||||
bool vnodeIsReadyForRead(SVnode* pVnode);
|
||||
bool vnodeIsRoleLeader(SVnode* pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!vnodeIsLeader(pVnode)) {
|
||||
!vnodeIsReadyForRead(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) {
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool vnodeIsReadyForRead(SVnode *pVnode) {
|
||||
if (syncIsReady(pVnode->sync)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (syncIsReadyForRead(pVnode->sync)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
|
||||
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) {
|
|||
return b;
|
||||
}
|
||||
|
||||
bool syncIsReadyForRead(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return false;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
// TODO: last not noop?
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
||||
// if false, set error code
|
||||
if (false == b) {
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
bool syncIsRestoreFinish(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) {
|
|||
return term;
|
||||
}
|
||||
|
||||
SyncIndex syncGetLastIndex(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return SYNC_INDEX_INVALID;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return lastIndex;
|
||||
}
|
||||
|
||||
SyncIndex syncGetCommitIndex(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return SYNC_INDEX_INVALID;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
SyncIndex cmtIndex = pSyncNode->commitIndex;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return cmtIndex;
|
||||
}
|
||||
|
||||
SyncGroupId syncGetVgId(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
pSyncNode->changing = true;
|
||||
}
|
||||
|
||||
// not restored, vnode enable
|
||||
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId,
|
||||
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
goto _END;
|
||||
}
|
||||
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = *pMsg;
|
||||
|
|
Loading…
Reference in New Issue