跳到主要内容

· 阅读需 19 分钟

作者 | 刘晓敏 于雨

一、简介

Java 的世界里,大家广泛使用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发,作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个大家庭,并改名 dubbo-getty

18 年的时候,我在公司里实践微服务,当时遇到最大的问题就是分布式事务问题。同年,阿里在社区开源他们的分布式事务解决方案,我也很快关注到这个项目,起初还叫 fescar,后来更名 seata。由于我对开源技术很感兴趣,加了很多社区群,当时也很关注 dubbo-go 这个项目,在里面默默潜水。随着对 seata 的了解,逐渐萌生了做一个 go 版本的分布式事务框架的想法。

要做一个 golang 版的分布式事务框架,首要的一个问题就是如何实现 RPC 通信。dubbo-go 就是很好的一个例子摆在眼前,遂开始研究 dubbo-go 的底层 getty。

二、如何基于 getty 实现 RPC 通信

getty 框架的整体模型图如下:

image.png

下面结合相关代码,详述 seata-golang 的 RPC 通信过程。

1. 建立连接

实现 RPC 通信,首先要建立网络连接吧,我们从 client.go 开始看起。

func (c *client) connect() {
var (
err error
ss Session
)

for {
// 建立一个 session 连接
ss = c.dial()
if ss == nil {
// client has been closed
break
}
err = c.newSession(ss)
if err == nil {
// 收发报文
ss.(*session).run()
// 此处省略部分代码

break
}
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
}
}

connect() 方法通过 dial() 方法得到了一个 session 连接,进入 dial() 方法:

func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT:
return c.dialTCP()
case UDP_CLIENT:
return c.dialUDP()
case WS_CLIENT:
return c.dialWS()
case WSS_CLIENT:
return c.dialWSS()
}

return nil
}

我们关注的是 TCP 连接,所以继续进入 c.dialTCP() 方法:

func (c *client) dialTCP() Session {
var (
err error
conn net.Conn
)

for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
// 建立加密连接
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// 建立 tcp 连接
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// 返回一个 TCPSession
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}

至此,我们知道了 getty 如何建立 TCP 连接,并返回 TCPSession。

2. 收发报文

那它是怎么收发报文的呢,我们回到 connection 方法接着往下看,有这样一行 ss.(*session).run(),在这行代码之后代码都是很简单的操作,我们猜测这行代码运行的逻辑里面一定包含收发报文的逻辑,接着进入 run() 方法:

func (s *session) run() {
// 省略部分代码

go s.handleLoop()
go s.handlePackage()
}

这里起了两个 goroutine,handleLoophandlePackage,看字面意思符合我们的猜想,进入 handleLoop() 方法:

func (s *session) handleLoop() {
// 省略部分代码

for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
// 省略部分代码

case outPkg, ok = <-s.wQ:
// 省略部分代码

iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// 通过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特
pkgBytes, err = s.writer.Write(s, outPkg)
// 省略部分代码

iovec = append(iovec, pkgBytes)

//省略部分代码
}
// 将这些二进制比特发送出去
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}

case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// 定时执行的逻辑,心跳等
s.listener.OnCron(s)
}
}
}
}

通过上面的代码,我们不难发现,handleLoop() 方法处理的是发送报文的逻辑,RPC 需要发送的消息首先由 s.writer 编码成二进制比特,然后通过建立的 TCP 连接发送出去。这个 s.writer 对应的 Writer 接口是 RPC 框架必须要实现的一个接口。

继续看 handlePackage() 方法:

func (s *session) handlePackage() {
// 省略部分代码

if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}

err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}

进入 handleTCPPackage() 方法:

func (s *session) handleTCPPackage() error {
// 省略部分代码

conn = s.Connection.(*gettyTCPConn)
for {
// 省略部分代码

bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// 从 TCP 连接中收到报文
bufLen, err = conn.recv(buf)
// 省略部分代码

break
}
// 省略部分代码

// 将收到的报文二进制比特写入 pkgBuf
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
break
}
// 通过 s.reader 将收到的报文解码成 RPC 消息
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// 省略部分代码

s.UpdateActive()
// 将收到的消息放入 TaskQueue 供 RPC 消费端消费
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
if exit {
break
}
}

return perrors.WithStack(err)
}

从上面的代码逻辑我们分析出,RPC 消费端需要将从 TCP 连接收到的二进制比特报文解码成 RPC 能消费的消息,这个工作由 s.reader 实现,所以,我们要构建 RPC 通信层也需要实现 s.reader 对应的 Reader 接口。

3. 底层处理网络报文的逻辑如何与业务逻辑解耦

我们都知道,netty 通过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么,getty 是如何实现的呢?

handlePackage() 方法最后,我们看到,收到的消息被放入了 s.addTask(pkg) 这个方法,接着往下分析:

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()
}

pkg 参数传递到了一个匿名方法,这个方法最终放入了 taskPool。这个方法很关键,在我后来写 seata-golang 代码的时候,就遇到了一个坑,这个坑后面分析。

接着我们看一下 taskPool 的定义:

// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
}
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}

构建了一个缓冲大小为 size (默认为  runtime.NumCPU() * 100) 的 channel sem。再看方法 AddTaskAlways(t task)

func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done:
return
default:
}

select {
case p.work <- t:
return
default:
}
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
}
}

加入的任务,会先由 len(p.sem) 个 goroutine 去消费,如果没有 goroutine 空闲,则会启动一个临时的 goroutine 去运行 t()。相当于有  len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去处理业务逻辑,而不是由处理网络报文的 goroutine 去运行业务逻辑,从而实现了解耦。写 seata-golang 时遇到的一个坑,就是忘记设置 taskPool 造成了处理业务逻辑和处理底层网络报文逻辑的 goroutine 是同一个,我在业务逻辑中阻塞等待一个任务完成时,阻塞了整个 goroutine,使得阻塞期间收不到任何报文。

4. 具体实现

下面的代码见 getty.go

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}

// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
}

// ReadWriter interface use for handle application packages
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error

// invoked when session closed.
OnClose(Session)

// invoked when got error.
OnError(Session, error)

// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)

// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}

通过对整个 getty 代码的分析,我们只要实现  ReadWriter 来对 RPC  消息编解码,再实现 EventListener 来处理 RPC 消息的对应的具体逻辑,将 ReadWriter 实现和 EventLister 实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通信。

4.1 编解码协议实现

下面是 seata 协议的定义: image-20201205214556457.png

在 ReadWriter 接口的实现 RpcPackageHandler 中,调用 Codec 方法对消息体按照上面的格式编解码:

// 消息编码为二进制比特
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA:
return SeataEncoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil
}
}

// 二进制比特解码为消息体
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA:
return SeataDecoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

4.2 Client 端实现

再来看 client 端 EventListener 的实现 RpcRemotingClient

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
ApplicationId: client.conf.ApplicationId,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
// 建立连接后向 Transaction Coordinator 发起注册 TransactionManager 的请求
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// 将与 Transaction Coordinator 建立的连接保存在连接池供后续使用
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()

return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)

// 处理事务消息,提交 or 回滚
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// 发送心跳
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

clientSessionManager.RegisterGettySession(session) 的逻辑 4.4 小节分析。

4.3 Server 端 Transaction Coordinator 实现

代码见 DefaultCoordinator

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
return nil
}

func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
// 释放 TCP 连接
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......", session.Stat())
}

func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// 将 TransactionManager 信息和 TCP 连接建立映射关系
coordinator.OnRegTmMessage(rpcMessage, session)
return
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
return
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// 将 ResourceManager 信息和 TCP 连接建立映射关系
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
defer func() {
if err := recover(); err != nil {
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// 处理事务消息,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("close a unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}

func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {

}

coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑分析见 4.4 小节。 消息进入 coordinator.OnTrxMessage(rpcMessage, session) 方法,将按照消息的类型码路由到具体的逻辑当中:

	switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus:
req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport:
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit:
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback:
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister:
req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport:
req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default:
return nil
}

4.4 session manager 分析

Client 端同 Transaction Coordinator 建立连接起连接后,通过 clientSessionManager.RegisterGettySession(session) 将连接保存在 serverSessions = sync.Map{} 这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就可以通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个连接既有可能用来发送 TM 消息也有可能用来发送 RM 消息。我们通过 RpcContext 来标识一个连接信息:

type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.Set
Session getty.Session
}

当收到事务消息时,我们需要构造这样一个 RpcContext 供后续事务处理逻辑使用。所以,我们会构造下列 map 来缓存映射关系:

var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered,
// it will be the RM registered
session_transactionroles = sync.Map{}

// session -> applicationId
identified_sessions = sync.Map{}

// applicationId -> ip -> port -> session
client_sessions = sync.Map{}

// applicationId -> resourceIds
client_resources = sync.Map{}
)

这样,Transaction Manager 和 Resource Manager 分别通过 coordinator.OnRegTmMessage(rpcMessage, session)coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在 client_resources map 中缓存 applicationId 与 resourceIds(一个应用可能存在多个 Resource Manager) 的关系。在需要时,我们就可以通过上述映射关系构造一个 RpcContext。这部分的实现和 java 版 seata 有很大的不同,感兴趣的可以深入了解一下。具体代码见 getty_session_manager.go 至此,我们就分析完了 seata-golang 整个 RPC 通信模型的机制。

三、seata-golang 的未来

seata-golang  从今年 4 月份开始开发,到 8 月份基本实现和 java 版 seata 1.2 协议的互通,对 mysql 数据库实现了 AT 模式(自动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端使用 mysql 存储数据,使 TC 变成一个无状态应用支持高可用部署。下图展示了 AT 模式的原理:image20201205-232516.png

后续,还有许多工作可以做,比如:对注册中心的支持、对配置中心的支持、和 java 版 seata 1.4 的协议互通、其他数据库的支持、raft transaction coordinator 的实现等,希望对分布式事务问题感兴趣的开发者可以加入进来一起来打造一个完善的 golang 的分布式事务框架。

如果你有任何疑问,欢迎钉钉扫码加入交流群【钉钉群号 33069364】:

作者简介

刘晓敏 (GitHubID dk-lockdown),目前就职于 h3c 成都分公司,擅长使用 Java/Go 语言,在云原生和微服务相关技术方向均有涉猎,目前专攻分布式事务。 于雨(github @AlexStocks),dubbo-go 项目和社区负责人,一个有十多年服务端基础架构研发一线工作经验的程序员,陆续参与改进过 Muduo/Pika/Dubbo/Sentinel-go 等知名项目,目前在蚂蚁金服可信原生部从事容器编排和 service mesh 工作。

参考资料

seata 官方:https://seata.apache.org

java 版 seata:https://github.com/apache/incubator-seata

seata-golang 项目地址:https://github.com/apache/incubator-seata-go

seata-golang go 夜读 b站分享:https://www.bilibili.com/video/BV1oz411e72T

· 阅读需 37 分钟

在Seata1.3.0版本中,数据源自动代理和手动代理一定不能混合使用,否则会导致多层代理,从而导致以下问题:

  1. 单数据源情况下:导致分支事务提交时,undo_log本身也被代理,即为 undo_log 生成了 undo_log, 假设为undo_log2,此时undo_log将被当作分支事务来处理;分支事务回滚时,因为undo_log2生成的有问题,在undo_log对应的事务分支回滚时会将业务表关联的undo_log也一起删除,从而导致业务表对应的事务分支回滚时发现undo_log不存在,从而又多生成一条状态为1的undo_log。这时候整体逻辑已经乱了,很严重的问题
  2. 多数据源和逻辑数据源被代理情况下:除了单数据源情况下会出现的问题,还可能会造成死锁问题。死锁的原因就是针对undo_log的操作,本该在一个事务中执行的select for updatedelete 操作,被分散在多个事务中执行,导致一个事务在执行完select for update后一直不提交,一个事务在执行delete时一直等待锁,直到超时

代理描述

即对DataSource代理一层,重写一些方法。比如getConnection方法,这时不直接返回一个Connection,而是返回ConnectionProxy,其它的以此类推

// DataSourceProxy

public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}

private void init(DataSource dataSource, String resourceGroupId) {
DefaultResourceManager.get().registerResource(this);
}

public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection();
}

@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}

手动代理

即手动注入一个DataSourceProxy,如下

@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}

自动代理

针对DataSource创建一个代理类,在代理类里面基于DataSource获取DataSourceProxy(如果没有就创建),然后调用DataSourceProxy的相关方法。核心逻辑在SeataAutoDataSourceProxyCreator

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes;
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());

public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}

@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}

@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}

@Override
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}

数据源多层代理

@Bean
@DependsOn("strangeAdapter")
public DataSource druidDataSource(StrangeAdapter strangeAdapter) {
doxx
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
  1. 首先我们在配置类里面注入了两个DataSource,分别为: DruidDataSourceDataSourceProxy, 其中DruidDataSource 作为 DataSourceProxy 的 targetDataSource 属性,并且DataSourceProxy为使用了@Primary注解声明
  2. 应用默认开启了数据源自动代理,所以在调用DruidDataSource相关方法时,又会为为DruidDataSource创建一个对应的数据源代理DataSourceProxy2
  3. 当我们在程序中想获取一个Connection时会发生什么?
    1. 先获取一个DataSource,因为DataSourceProxyPrimary,所以此时拿到的是DataSourceProxy
    2. 基于DataSource获取一个Connection,即通过DataSourceProxy获取Connection。此时会先调用targetDataSource 即 DruidDataSource 的 getConnection 方法,但因为切面会对DruidDataSource进行拦截,根据步骤2的拦截逻辑可以知道,此时会自动创建一个DataSourceProxy2,然后调用DataSourceProxy2#getConnection,然后再调用DruidDataSource#getConnection。最终形成了双层代理, 返回的Connection也是一个双层的ConnectionProxy

上面其实是改造之后的代理逻辑,Seata默认的自动代理会对DataSourceProxy再次进行代理,后果就是代理多了一层此时对应的图如下

数据源多层代理会导致的两个问题在文章开头处已经总结了,下面会有案例介绍。

分支事务提交

通过ConnectionProxy中执行对应的方法,会发生什么?以update操作涉及到的一个分支事务提交为例:

  1. 执行ConnectionProxy#prepareStatement, 返回一个PreparedStatementProxy
  2. 执行PreparedStatementProxy#executeUpdatePreparedStatementProxy#executeUpdate大概会帮做两件事情: 执行业务SQL和提交undo_log

提交业务SQL

// ExecuteTemplate#execute
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}

主要流程就是: 先执行业务SQL,然后执行ConnectionProxy的commit方法,在这个方法中,会先帮我们执行对应的 undo_log SQL,然后提交事务

PreparedStatementProxy#executeUpdate => 
ExecuteTemplate#execute =>
BaseTransactionalExecutor#execute =>
AbstractDMLBaseExecutor#doExecute =>
AbstractDMLBaseExecutor#executeAutoCommitTrue =>
AbstractDMLBaseExecutor#executeAutoCommitFalse => 在这一步操中,会触发statementCallback#execute方法,即调用调用原生PreparedStatement#executeUpdate方法
ConnectionProxy#commit
ConnectionProxy#processGlobalTransactionCommit

UNDO_LOG插入

// ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务,简单理解向server发一个请求,然后server在branch_table表里插入一条记录,不关注
register();
} catch (TransactionException e) {
// 如果没有for update 的sql,会直接在commit之前做注册,此时不止插入一条branch记录,而会附带锁信息进行竞争,下方的异常一般就是在注册时没拿到锁抛出,一般就是纯update语句的并发下会触发竞争锁失败的异常 @FUNKYE
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// undo_log处理,期望用 targetConnection 处理 @1
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);

// 提交本地事务,期望用 targetConnection 处理 @2
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
  1. undo_log处理@1,解析当前事务分支涉及到的undo_log,然后使用TargetConnection, 写到数据库
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}

String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();

BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}

insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,cp.getTargetConnection());
}
  1. 提交本地事务@2,即通过TargetConnection提交事务。即 务SQL执行undo_log写入即事务提交 用的都是同一个TargetConnection

lcn的内置数据库方案,lcn是将undolog写到他内嵌的h2(忘了是不是这个来着了)数据库上,此时会变成2个本地事务,一个是h2的undolog插入事务,一个是业务数据库的事务,如果在h2插入后,业务数据库异常,lcn的方案就会出现数据冗余,回滚数据的时候也是一样,删除undolog跟回滚业务数据不是一个本地事务. 但是lcn这样的好处就是入侵小,不需要另外添加undolog表。 感谢@FUNKYE大佬给的建议,对lcn不太了解,有机会好好研究一下

分支事务回滚

  1. Server端向Client端发送回滚请求
  2. Client端接收到Server发过来的请求,经过一系列处理,最终会到DataSourceManager#branchRollback方法
  3. 先根据resourceId从DataSourceManager.dataSourceCache中获取对应的DataSourceProxy,此时为masterSlaveProxy(回滚阶段我们就不考代理数据源问题,简单直接一些,反正最终拿到的都是TragetConnection)
  4. 根据Server端发过来的xid和branchId查找对应的undo_log并解析其rollback_info属性,每条undo_log可能会解析出多条SQLUndoLog,每个SQLUndoLog可以理解成是一个操作。比如一个分支事务先更新A表,再更新B表,这时候针对该分支事务生成的undo_log就包含两个SQLUndoLog:第一个SQLUndoLog对应的是更新A表的前后快照;第二个SQLUndoLog对应的是更新B表的前后快照
  5. 针对每条SQLUndoLog执行对应的回滚操作,比如一个SQLUndoLog对应的操作是INSERT,则其对应的回滚操作就是DELETE
  6. 根据xid和branchId删除该undo_log
// AbstractUndoLogManager#undo   删除了部分非关键代码

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;

for (; ; ) {
try {
// 获取原生数据源的Connection, 回滚阶段我们不管代理数据源问题,最终拿到的都是 TargetConnection
conn = dataSourceProxy.getPlainConnection();

// 将回滚操作放在一个本地事务中,手动提交,确保最终业务SQL操作和undo_log删除操作一起提交
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}

// 根据xid 和 branchId 查询 undo_log,注意此时的SQL语句 SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();

boolean exists = false;
while (rs.next()) {
exists = true;
// status == 1 undo_log不处理,和防悬挂相关
if (!canUndo(state)) {
return;
}

// 解析undo_log
byte[] rollbackInfo = getRollbackInfo(rs);
BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance(serializer).decode(rollbackInfo);
try {
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
// 执行对应的回滚操作
undoExecutor.executeOn(conn);
}
}
}

//
if (exists) {
LOGGER.error("\n delete from undo_log where xid={} AND branchId={} \n", xid, branchId);
deleteUndoLog(xid, branchId, conn);
conn.commit();
// 和防悬挂相关 如果根据 xid和branchId 没有查到undo_log,说明这个分支事务有异常:例如业务处理超时,导致全局事务回滚,但这时候业务undo_log并没有插入
} else {
LOGGER.error("\n insert into undo_log xid={},branchId={} \n", xid, branchId);
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return;
} catch (Throwable e) {
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e);
}
}
}

有以下几个注意点:

  1. 回滚时不考虑数据源代理问题,最终都是使用TargetConnection
  2. 设置atuoCommit为false,即需要手动提交事务
  3. 根据xid和branchId查询undo_log时加了for update,也就是说,这个事务会持有这条undo_log的锁直到所有回滚操作都完成,因为完成之后才会

多层代理问题

数据源多层代理会导致的几个问题在文章开头的时候已经提到过了,重点分析一下为什么会造成以上问题:

对分支事务提交的影响

先分析一下,如果使用双层代理会发生什么?我们从两个方面来分析:业务SQLundo_log

  1. 业务SQL
PreparedStatementProxy1.executeUpdate => 
statementCallback#executeUpdate(PreparedStatementProxy2#executeUpdate) =>
PreparedStatement#executeUpdate

好像没啥影响,就是多绕了一圈,最终还是通过PreparedStatement执行

  1. undo_log
ConnectionProxy1#getTargetConnection -> 
ConnectionProxy2#prepareStatement ->
PreparedStatementProxy2#executeUpdate ->
PreparedStatement#executeUpdate(原生的undo_log写入,在此之前会对为该 undo_log 生成 undo_log2(即 undo_log 的 undo_log)) ->
ConnectionProxy2#commit ->
ConnectionProxy2#processGlobalTransactionCommit(写入undo_log2) ->
ConnectionProxy2#getTargetConnection ->
TargetConnection#prepareStatement ->
PreparedStatement#executeUpdate

对分支事务回滚的影响

在事务回滚之后,为何undo_log没有被删除呢?

其实并不是没有被删除。前面已经说过,双层代理会导致undo_log被当作分支事务来处理,所以也会为该 undo_log生成一个undo_log(假设为undo_log2),而undo_log2生成的有问题(其实也没问题,就应该这样生成),从而导致回滚时会将业务表关联的undo_log也一起删除,最终导致业务表对应的事务分支回滚时发现undo_log不存在,从而又多生成一条状态为为1的undo_log

回滚之前

// undo_log
84 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.1KB 0
85 59734075254222849 172.16.120.59:23004:59734061438185472 serializer=jackson 4.0KB 0

// branch_table
59734070967644161 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware
59734075254222849 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// lock_table
jdbc:mysql://xx^^^seata_storage^^^1 59734070967644161 jdbc:mysql://172.16.248.10:3306/tuya_middleware seata_storage 1
jdbc:mysql://xx^^^undo_log^^^84 59734075254222849 jdbc:mysql://172.16.248.10:3306/tuya_middleware undo_log 84

回滚之后

// 生成了一条状态为1的undo_log,对应的日志为: undo_log added with GlobalFinished
86 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.0Byte 1

问题分析

  1. 根据xid和branchId找到对应的undo_log日志
  2. 对undo_log进行解析,主要就是解析它的rollback_info字段,rollback_info解析出来就是一个SQLUndoLog集合,每条SQLUndoLog对应着一个操作,里面包含了该操作的前后的快照,然后执行对应的回滚
  3. 根据xid和branchId删除undo_log日志

因为双层代理问题,导致一条undo_log变成了一个分支事务,所以发生回滚时,我们也需要对undo_log分支事务进行回滚: 1、首先根据xid和branchId找到对应的undo_log并解析其rollback_info属性,这里解析出来的rollback_info包含了两条SQLUndoLog。为什么有两条?

仔细想想也可以可以理解,第一层代理针对seata_storage的操作,放到缓存中,本来执行完之后是需要清掉的,但因为这里是双层代理,所以这时候这个流程并没有结束。轮到第二层代理对undo_log操作时,将该操作放到缓存中,此时缓存中有两个操作,分别为seata_storage的UPDATEundo_log的INSERT。所以这也就很好理解为什么针对undo_log操作的那条undo_log格外大(4KB),因为它的rollback_info包含了两个操作。

有一点需要注意的是,第一条SQLUndoLog对应的after快照,里面的branchId=59734070967644161 pk=84, 即 seata_storage分支对应的branchIdseata_storage对应的undo_log PK。也就是说,undo_log回滚时候 把seata_storage对应的undo_log删掉了。 那undo_log本身对应的undo_log 如何删除呢?在接下来的逻辑中会根据xid和branchId删除

2、解析第一条SQLUndoLog,此时对应的是undo_log的INSERT操作,所以其对应的回滚操作是DELETE。因为undo_log此时被当作了业务表。所以这一步会将59734075254222849对应的undo_log删除,但这个其实是业务表对应的对应的undo_log

3、解析第二条SQLUndoLog,此时对应的是seata_storage的UPDATE操作,这时会通过快照将seata_storage对应的记录恢复

4、根据xid和branchId删除undo_log日志,这里删除的是undo_log 的 undo_log , 即 undo_log2。所以,执行到这里,两条undo_log就已经被删除了

5、接下来回滚seata_storage,因为这时候它对应的undo_log已经在步骤2删掉了,所以此时查不到undo_log,然后重新生成一条status == 1 的 undo_log

案例分析

背景

1、配置了三个数据源: 两个物理数据源、一个逻辑数据源,但是两个物理数据源对应的连接地址是一样的。这样做有意思吗?

@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Bean("dsSlave")
DynamicDataSource dsSlave() {
return new DynamicDataSource(slaveDsRoute);
}

@Primary
@Bean("masterSlave")
DataSource masterSlave(@Qualifier("dsMaster") DataSource dataSourceMaster,
@Qualifier("dsSlave") DataSource dataSourceSlave) throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(2);
//主库
dataSourceMap.put("dsMaster", dataSourceMaster);
//从库
dataSourceMap.put("dsSlave", dataSourceSlave);
// 配置读写分离规则
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(
"masterSlave", "dsMaster", Lists.newArrayList("dsSlave")
);
Properties shardingProperties = new Properties();
shardingProperties.setProperty("sql.show", "true");
shardingProperties.setProperty("sql.simple", "true");
// 获取数据源对象
DataSource dataSource = MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveRuleConfig, shardingProperties);
log.info("datasource initialized!");
return dataSource;˚
}

2、开启seata的数据源动态代理,根据seata的数据源代理逻辑可以知道,最终会生成三个代理数据源,原生数据源和代理数据源的关系缓存在DataSourceProxyHolder.dataSourceProxyMap中,假如原生数据源和代理数据源对应的关系如下:

dsMaster(DynamicDataSource)           =>       dsMasterProxy(DataSourceProxy)
dsSlave(DynamicDataSource) => dsSlaveProxy(DataSourceProxy)
masterSlave(MasterSlaveDataSource) => masterSlaveProxy(DataSourceProxy)

所以,最终在IOC容器中存在的数据源是这三个: dsMasterProxy 、 dsSlaveProxy 、 masterSlaveProxy 。根据@Primary的特性可以知道,当我们从容器中获取一个DataSource的时候,默认返回的就是代理数据源 masterSlaveProxy

对shardingjdbc没有具体的研究过,只是根据debug时看到的代码猜测它的工作机制,又不对的地方,还请大佬指出来

masterSlaveProxy可以看成是被 DataSourceProxy 包装后的 MasterSlaveDataSource。我们可以大胆的猜测MasterSlaveDataSource并不是一个物理数据源,而是一个逻辑数据源,可以简单的认为里面包含了路由的逻辑。当我们获取一个连接时,会通过里面的路由规则选择到具体的物理数据源,然后通过该物理数据源获取一个真实的连接。 路由规则应该可以自己定义,根据debug时观察到的现象,默认的路由规则应该是:

  1. 针对select 读操作,会路由到从库,即我们的 dsSlave
  2. 针对update 写操作,会路由到主库,即我们的 dsMaster

3、每个DataSourceProxy在初始化的时候,会解析该真实DataSource的连接地址,然后将该连接地址和DataSourceProxy本身维护DataSourceManager.dataSourceCache中。DataSourceManager.dataSourceCache有一个作用是用于回滚:回滚时根据连接地址找到对应的DataSourceProxy,然后基于该DataSourceProxy做回滚操作。 但我们可以发现这个问题,这三个数据源解析出来的连接地址是一样的,也就是key重复了,所以在DataSourceManager.dataSourceCache中中,当连接地相同时,后注册的数据源会覆盖已存在的。即: DataSourceManager.dataSourceCache最终存在的是masterSlaveProxy,也就是说,最终会通过masterSlaveProxy进行回滚,这点很重要。

4、涉及到的表:很简单,我们期待的就一个业务表seata_account,但因为重复代理问题,导致seata将undo_log也当成了一个业务表

  1. seata_account
  2. undo_log

好了,这里简单介绍一下背景,接下来进入Seata环节

需求

我们的需求很简单,就是在分支事务里面执行一条简单的update操作,更新seata_account的count值。在更新完之后,手动抛出一个异常,触发全局事务的回滚。 为了更便于排查问题,减少干扰,我们全局事务中就使用一个分支事务,没有其它分支事务了。SQL如下:

update seata_account set count = count - 1 where id = 100;

问题现象

Client:在控制台日志中,不断重复打印以下日志

  1. 以上日志打印的间隔为20s,而我查看了数据库的innodb_lock_wait_timeout属性值,刚好就是20,说明每次回滚请求过来的时候,都因为获取锁超时(20)而回滚失败
  2. 为什么会没过20s打印一次?因为Server端会有定时处理回滚请求
// 分支事务开始回滚
Branch rollback start: 172.16.120.59:23004:59991911632711680 59991915571163137 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// undo_log事务分支 原始操作对应是 insert, 所以其回滚为 delete
undoSQL undoSQL=DELETE FROM undo_log WHERE id = ? , PK=[[id,139]]
// 因为第一层代理对应的操作也在上下文中,undo_log分支事务 提交时候, 对应的undo_log包含两个操作
undoSQL undoSQL=UPDATE seata_account SET money = ? WHERE id = ? , PK=[[id,1]]

// 该分支事务回滚完成之后,再删除该分支事务的对应的 undo_log
delete from undo_log where xid=172.16.120.59:23004:59991911632711680 AND branchId=59991915571163137

// 抛出异常,提示回滚失败,失败原因是`Lock wait timeout exceeded`, 在根据xid和branchId删除undo_log时失败,失败原因是获取锁超时,说明此时有另一个操作持有该记录的锁没有释放
branchRollback failed. branchType:[AT], xid:[172.16.120.59:23004:59991911632711680], branchId:[59991915571163137], resourceId:[jdbc:mysql://172.16.248.10:3306/tuya_middleware], applicationData:[null]. reason:[Branch session rollback failed and try again later xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137 Lock wait timeout exceeded; try restarting transaction]

Server:每20s打印以下日志,说明server在不断的重试发送回滚请求

Rollback branch transaction fail and will retry, xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137

在该过程中,涉及到的SQL大概如下:

1. SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE							slaveDS
2. SELECT * FROM undo_log WHERE (id ) in ( (?) ) slaveDS
3. DELETE FROM undo_log WHERE id = ? masterDS
4. SELECT * FROM seata_account WHERE (id ) in ( (?) ) masterDS
5. UPDATE seata_account SET money = ? WHERE id = ? masterDS
6. DELETE FROM undo_log WHERE branch_id = ? AND xid = ? masterDS

此时查看数据库的 事务情况、锁情况 、锁等待关系 1、查当前正在执行的事务

SELECT * FROM information_schema.INNODB_TRX;

2、查当前锁情况

SELECT * FROM information_schema.INNODB_LOCKs;

3、查当前锁等待关系

SELECT * FROM information_schema.INNODB_LOCK_waits;

SELECT
block_trx.trx_mysql_thread_id AS 已经持有锁的sessionID,
request_trx.trx_mysql_thread_id AS 正在申请锁的sessionID,
block_trx.trx_query AS 已经持有锁的SQL语句,
request_trx.trx_query AS 正在申请锁的SQL语句,
waits.blocking_trx_id AS 已经持有锁的事务ID,
waits.requesting_trx_id AS 正在申请锁的事务ID,
waits.requested_lock_id AS 锁对象的ID,
locks.lock_table AS lock_table, -- 锁对象所锁定的表
locks.lock_type AS lock_type, -- 锁类型
locks.lock_mode AS lock_mode -- 锁模式
FROM
information_schema.innodb_lock_waits AS waits
INNER JOIN information_schema.innodb_trx AS block_trx ON waits.blocking_trx_id = block_trx.trx_id
INNER JOIN information_schema.innodb_trx AS request_trx ON waits.requesting_trx_id = request_trx.trx_id
INNER JOIN information_schema.innodb_locks AS locks ON waits.requested_lock_id = locks.lock_id;

  1. 涉及到到记录为 branch_id = 59991915571163137 AND xid = 172.16.120.59:23004:59991911632711680
  2. 事务ID1539483284持有该记录的锁,但是它对应的SQL为空,那应该是在等待commit
  3. 事务ID1539483286在尝试获取该记录的锁,但从日志可以发现,它一直锁等待超时

大概可以猜测是 select for updatedelete from undo ... 发生了冲突。根据代码中的逻辑,这两个操作应该是放在一个事务中提交了,为什么被分开到两个事务了?

问题分析

结合上面的介绍的回滚流程看看我们这个例子在回滚时会发生什么

  1. 先获取数据源,此时dataSourceProxy.getPlainConnection()获取到的是MasterSlaveDataSource数据源
  2. select for update操作的时候,通过MasterSlaveDataSource获取一个Connection,前面说到过MasterSlaveDataSource是一个逻辑数据源,里面有路由逻辑,根据上面介绍的,这时候拿到的是dsSlaveConnection
  3. 在执行delete from undo ...操作的时候,这时候拿到的是dsMasterConnection
  4. 虽然dsSlavedsMaster对应的是相同的地址,但此时获取到的肯定是不同的连接,所以此时两个操作肯定是分布在两个事务中
  5. 执行select for update的事务,会一直等待直到删除undo_log完成才会提交
  6. 执行delete from undo ...的事务,会一直等待select for update的事务释放锁
  7. 典型的死锁问题

验证猜想

我尝试用了两个方法验证这个问题:

  1. 修改Seata代码,将select for update改成select,此时在查询undo_log就不需要持有该记录的锁,也就不会造成死锁

  2. 修改数据源代理逻辑,这才是问题的关键,该问题主要原因不是select for update。在此之前多层代理问题已经产生,然后才会造成死锁问题。从头到尾我们就不应该对masterSlave数据源进行代理。它只是一个逻辑数据源,为什么要对它进行代理呢?如果代理masterSlave,就不会造成多层代理问题,也就不会造成删除undo_log时的死锁问题

最终实现

masterSlave也是一个DataSource类型,该如何仅仅对dsMasterdsSlave 代理而不对masterSlave代理呢?观察SeataAutoDataSourceProxyCreator#shouldSkip方法,我们可以通过EnableAutoDataSourceProxy注解的excludes属性解决这个问题

@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}

即: 将数据源自动代理关闭,然后在启动类加上这个注解

@EnableAutoDataSourceProxy(excludes = {"org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource"})

自动代理在新版本中的优化

因为Seata 1.4.0还没有正式发布,我目前看的是1.4.0-SNAPSHOT版本的代码,即当前时间ddevelop分支最新的代码

代码改动

主要改动如下,一些小的细节就不过多说明了:

  1. DataSourceProxyHolder调整
  2. DataSourceProxy调整
  3. SeataDataSourceBeanPostProcessor新增

DataSourceProxyHolder

在这个类改动中,最主要是其putDataSource方法的改动

public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource;
if (dataSource instanceof SeataDataSourceProxy) {
SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
// 如果是代理数据源,并且和当前应用配置的数据源代理模式(AT/XA)一样, 则直接返回
if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
return (SeataDataSourceProxy)dataSource;
}

// 如果是代理数据源,和当前应用配置的数据源代理模式(AT/XA)不一样,则需要获取其TargetDataSource,然后为其创建一个代理数据源
originalDataSource = dataSourceProxy.getTargetDataSource();
} else {
originalDataSource = dataSource;
}

// 如果有必要,基于 TargetDataSource 创建 代理数据源
return this.dataSourceProxyMap.computeIfAbsent(originalDataSource,
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}

DataSourceProxyHolder#putDataSource方法主要在两个地方被用到:一个是在SeataAutoDataSourceProxyAdvice切面中;一个是在SeataDataSourceBeanPostProcessor中。 这段判断为我们解决了什么问题?数据源多层代理问题。在开启了数据源自动代理的前提下,思考以下场景:

  1. 如果我们在项目中手动注入了一个DataSourceProxy,这时候在切面调用DataSourceProxyHolder#putDataSource方法时会直接返回该DataSourceProxy本身,而不会为其再创建一个DataSourceProxy
  2. 如果我们在项目中手动注入了一个DruidSource,这时候在切面调用DataSourceProxyHolder#putDataSource方法时会为其再创建一个DataSourceProxy并返回

这样看好像问题已经解决了,有没有可能会有其它的问题呢?看看下面的代码

@Bean
public DataSourceProxy dsA(){
return new DataSourceProxy(druidA)
}

@Bean
public DataSourceProxy dsB(DataSourceProxy dsA){
return new DataSourceProxy(dsA)
}
  1. 这样写肯定是不对,但如果他就要这样写你也没办法
  2. dsA没什么问题,但dsB还是会产生双层代理的问题,因为此时dsB 的 TargetDataSourcedsA
  3. 这就涉及到DataSourceProxy的改动

DataSourceProxy

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
// 下面这个判断,保证了在我们传入一个DataSourceProxy的时候,也不会产生双层代理问题
if (targetDataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
init(targetDataSource, resourceGroupId);
}

SeataDataSourceBeanPostProcessor

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

......

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}

//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy.");
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
  1. SeataDataSourceBeanPostProcessor实现了BeanPostProcessor接口,在一个bean初始化后,会执行BeanPostProcessor#postProcessAfterInitialization方法。也就是说,在postProcessAfterInitialization方法中,这时候的bean已经是可用状态了
  2. 为什么要提供这么一个类呢?从它的代码上来看,仅仅是为了再bean初始化之后,为数据源初始化对应的DataSourceProxy,但为什么要这样做呢?

因为有些数据源在应用启动之后,可能并不会初始化(即不会调用数据源的相关方法)。如果没有提供SeataDataSourceBeanPostProcessor类,那么就只有在SeataAutoDataSourceProxyAdvice切面中才会触发DataSourceProxyHolder#putDataSource方法。假如有一个客户端在回滚的时候宕机了,在重启之后,Server端通过定时任务向其派发回滚请求,这时候客户端需要先根据rsourceId(连接地址)找到对应的DatasourceProxy。但如果在此之前客户端还没有主动触发数据源的相关方法,就不会进入SeataAutoDataSourceProxyAdvice切面逻辑,也就不会为该数据源初始化对应的DataSourceProxy,从而导致回滚失败

多层代理总结

通过上面的分析,我们大概已经知道了seata在避免多层代理上的一些优化,但其实还有一个问题需要注意:逻辑数据源的代理

这时候的调用关系为: masterSlaveProxy -> masterSlave -> masterproxy/slaveProxy -> master/slave

此时可以通过excludes属性排除逻辑数据源,从而不为其创建数据源代理。

总结一下:

  1. 在为数据源初始化对应的DataSourceProxy时,判断是否有必要为其创建对应的DataSourceProxy,如果本身就是DataSourceProxy,就直接返回
  2. 针对一些数据源手动注入的情况,为了避免一些人为误操作的导致的多层代理问题,在DataSourceProxy构造函数中添加了判断,如果入参TragetDatasource本身就是一个DataSourceProxy, 则获取其target属性作为新DataSourceProxy的tragetDatasource
  3. 针对一些其它情况,比如逻辑数据源代理问题,通过excludes属性添加排除项,这样可以避免为逻辑数据源创建DataSourceProxy

全局事务和本地事务使用建议

有一个问题,如果在一个方法里涉及到多个DB操作,比如涉及到3条update操作,我们需不需在这个方法使用spring中的@Transactional注解?针对这个问题,我们分别从两个角度考虑:不使用@Transactional注解 和 使用@Transactional注解

不使用@Transactional注解

  1. 在提交阶段,因为该分支事务有3条update操作,每次执行update操作的时候,都会通过数据代理向TC注册一个分支事务,并为其生成对应的undo_log,最终3个update操作被当作3个分支事务来处理
  2. 在回滚阶段,需要回滚3个分支事务
  3. 数据的一致性通过seata全局事务来保证

使用@Transactional注解

  1. 在提交阶段,3个update操作被当作一个分支事务来提交,所以最终只会注册一个分支事务
  2. 在回滚阶段,需要回滚1个分支事务
  3. 数据的一致性:这3个update的操作通过本地事务的一致性保证;全局一致性由seata全局事务来保证。此时3个update仅仅是一个分支事务而已

结论

通过上面的对比,答案是显而易见的,合理的使用本地事务,可以大大的提升全局事务的处理速度。上面仅仅是3个DB操作,如果一个方法里面涉及到的DB操作更多呢,这时候两种方式的差别是不是更大呢?

最后,感谢@FUNKYE大佬为我解答了很多问题并提供了宝贵建议!

· 阅读需 12 分钟

【分布式事务Seata源码解读二】Client端启动流程

本文从源码的角度分析一下AT模式下Client端启动流程,所谓的Client端,即业务应用方。分布式事务分为三个模块:TC、TM、RM。其中TC位于seata-server端,而TM、RM通过SDK的方式运行在client端。

下图展示了Seata官方Demo的一个分布式事务场景,分为如下几个微服务,共同实现了一个下订单、扣库存、扣余额的分布式事务。

  • BusinessService: 业务服务,下单服务的入口
  • StorageService: 库存微服务,用于扣减商品库存
  • OrderService: 订单微服务,创建订单
  • AccountService: 账户微服务,扣减用户账户的余额

在这里插入图片描述

从上图也可以看出,在AT模式下Seata Client端主要通过如下三个模块来实现分布式事务:

  • GlobalTransactionScanner: GlobalTransactionScanner负责初始TM、RM模块,并为添加分布式事务注解的方法添加拦截器,拦截器负责全局事务的开启、提交或回滚
  • DatasourceProxy: DatasourceProxy为DataSource添加拦截,拦截器会拦截所有SQL执行,并作为RM事务参与方的角色参与分布式事务执行。
  • Rpc Interceptor: 在上一篇分布式事务Seata源码解读一中有提到分布式事务的几个核心要点,其中有一个是分布式事务的跨服务实例传播。Rpc Interceptor的职责就是负责在多个微服务之间传播事务。

seata-spring-boot-starter

引用seata分布式事务SDK有两种方式,依赖seata-all或者seata-spring-boot-starter,推荐使用seata-spring-boot-starter,因为该starter已经自动注入了上面提到的三个模块,用户只要添加相应的配置,在业务代码添加全局分布式事务注解即可。下面从seata-spring-boot-starter项目中的代码入手:

如下图所示是seata-spring-boot-starter的项目结构: 在这里插入图片描述 主要分为以下几个模块:

  • properties: properties目录下都是Springboot 适配seata的相关配置类,即可以通过SpringBoot的配置方式来Seata的相关参数
  • provider: provider目录下的类负责把Springboot、SpringCloud的配置适配到Seata配置中
  • resources: resources目录下主要有两个文件,spring.factories用于注册Springboot的自动装配类,ExtConfigurationProvider用于注册SpringbootConfigurationProvider类,该Provider类负责把SpringBoot的相关配置类适配到Seata中。

对于springboot-starter项目,我们先查看resources/META-INF/spring.factories文件:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration

可以看到在spring.factories中配置了自动装配类:SeataAutoConfiguration,在该装配类中主要注入了GlobalTransactionScanner和seataAutoDataSourceProxyCreator两个实例。代码如下:

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled",
havingValue = "true",
matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {

...

// GlobalTransactionScanner负责为添加GlobalTransaction注解的方法添加拦截器,
// 并且负责初始化RM、TM
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties,
FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(),
seataProperties.getTxServiceGroup(),
failureHandler);
}

// SeataAutoDataSourceProxyCreator负责为Spring中的所有DataSource生成代理对象,
// 从而实现拦截所有SQL的执行
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {
"enableAutoDataSourceProxy", "enable-auto" +
"-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying());
}
}

GlobalTransactionScanner

GlobalTransactionScanner继承于AutoProxyCreator,AutoProxyCreator是Spring中实现AOP的一种方式,可以拦截Spring中的所有实例,判断是否需要进行代理。下面列出了GlobalTransactionScanner中一些比较重要的字段和拦截代理的核心方法:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean {
...
// interceptor字段是对应一个代理对象的拦截器,
// 可以认为是一个临时变量,有效期是一个被代理对象
private MethodInterceptor interceptor;

// globalTransactionalInterceptor是通用的Interceptor,
// 非TCC事务方式的都使用该Interceptor
private MethodInterceptor globalTransactionalInterceptor;

// PROXYED_SET存储已经代理过的实例,防止重复处理
private static final Set<String> PROXYED_SET = new HashSet<>();

// applicationId是一个服务的唯一标识,
// 对应springcloud项目中的spring.application.name
private final String applicationId;
// 事务的分组标识,参考文章wiki:https://seata.apache.org/zh-cn/docs/user/txgroup/transaction-group/
private final String txServiceGroup;

...

// 判断是否需要代理目标对象,如果需要代理,则生成拦截器赋值到类变量interceptor中
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 判断是否禁用分布式事务
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}

// 每次处理一个被代理对象时先把interceptor置为null,所以interceptor的
// 生命周期是一个被代理对象,由于是在另外一个方法getAdvicesAndAdvisorsForBean
// 中使用interceptor,所以该interceptor要定义为一个类变量
interceptor = null;

// 判断是否是TCC事务模式,判断的主要依据是方法上是否有TwoPhaseBusinessAction注解
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName,
applicationContext)) {
// 创建一个TCC事务的拦截器
interceptor =
new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
// 获取待处理对象的class类型
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
// 获取待处理对象继承的所有接口
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// 如果待处理对象的class或者继承的接口上有GlobalTransactional注解,
// 或者待处理对象的class的任一个方法上有GlobalTransactional或者
// GlobalLock注解则返回true,即需要被代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

// 如果interceptor为null,即不是TCC模式,
// 则使用globalTransactionalInterceptor作为拦截器
if (interceptor == null) {
// globalTransactionalInterceptor只会被创建一次
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor =
new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener) globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}

if (!AopUtils.isAopProxy(bean)) {
// 如果bean本身不是Proxy对象,则直接调用父类的wrapIfNecessary生成代理对象即可
// 在父类中会调用getAdvicesAndAdvisorsForBean获取到上面定义的interceptor
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 如果该bean已经是代理对象了,则直接在代理对象的拦截调用链AdvisedSupport
// 上直接添加新的interceptor即可。
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName,
getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
// 标识该beanName已经处理过了
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

// 返回wrapIfNecessary方法中计算出的interceptor对象
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName,
TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
}

上面介绍了GlobalTransactionScanner是如何通过注解拦截全局事务的,具体拦截器实现为TccActionInterceptor和GlobalTransactionalInterceptor,对于AT模式来说我们主要关心GlobalTransactionalInterceptor,在后续的文章中会介绍GlobalTransactionalInterceptor的具体实现。

另外GloabalTransactionScanner还负责TM、RM的初始化工作,是在initClient方法中实现的:

private void initClient() {
...

//初始化TM
TMClient.init(applicationId, txServiceGroup);
...

//初始化RM
RMClient.init(applicationId, txServiceGroup);
...

// 注册Spring shutdown的回调,用来释放资源
registerSpringShutdownHook();

}

TMClient、RMClient都是Seata基于Netty实现的Rpc框架的客户端类,只是业务逻辑不同,由于TMClient相对来说更简单一些,我们以RMClient为例看一下源码:

public class RMClient {
// RMClient的init是一个static方法,创建了一个RmNettyRemotingClient实例,并调用init方法
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient =
RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
}

RmNettyRemotingClient的实现如下:

@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
// ResourceManager负责处理事务参与方,支持AT、TCC、Saga三种模式
private ResourceManager resourceManager;
// RmNettyRemotingClient单例
private static volatile RmNettyRemotingClient instance;
private final AtomicBoolean initialized = new AtomicBoolean(false);
// 微服务的唯一标识
private String applicationId;
// 分布式事务分组名称
private String transactionServiceGroup;

// RMClient中init方法会调用该init方法
public void init() {
// 注册Seata自定义Rpc的Processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
// 调用父类的init方法,在父类中负责Netty的初始化,与Seata-Server建立连接
super.init();
}
}

// 注册Seata自定义Rpc的Processor
private void registerProcessor() {
// 1.注册Seata-Server发起branchCommit的处理Processor
RmBranchCommitProcessor rmBranchCommitProcessor =
new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor,
messageExecutor);

// 2.注册Seata-Server发起branchRollback的处理Processor
RmBranchRollbackProcessor rmBranchRollbackProcessor =
new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor
, messageExecutor);

// 3.注册Seata-Server发起删除undoLog的处理Processor
RmUndoLogProcessor rmUndoLogProcessor =
new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor,
messageExecutor);

// 4.注册Seata-Server返回Response的处理Processor,ClientOnResponseProcessor
// 用于处理由Client主动发起Request,Seata-Server返回的Response。
// ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
// 返回的Response对应起来,从而实现Rpc
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(),
getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor,
null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);

// 5. 处理Seata-Server返回的Pong消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor,
null);
}
}

上面的逻辑看起来比较复杂,相关类也比较多,如:各种Processor、各种MessageType、TransactionMessageHandler、ResourceManager。其实本质上就是Rpc调用,分为Rm主动调用和Seata主动调用。

  • Rm主动调用方法: 如:注册分支、汇报分支状态、申请全局锁等。Rm主动调用的方法都需要在ClientOnResponseProcessor中处理Seata-Server返回的Response
  • Seata-Server主动调用方法: 如:提交分支事务、回滚分支事务、删除undolog日志。Seata-Server主动调用的方法,Client端分别对应不同的Processor来处理,并且处理结束后要返回给Seata-Server处理结果Response。而事务提交、回滚的核心实现逻辑都在TransactionMessageHandler、ResourceManager中。

关于TransactionMessageHandler、ResourceManager的具体实现也会在后续的章节中详细描述。

下一篇会介绍一下SeataAutoDataSourceProxyCreator、Rpc Interceptor是如何初始化以及拦截的。

· 阅读需 9 分钟

前言

最近因为工作需要,研究学习了Seata分布式事务框架,本文把自己学习的知识记录一下

Seata总览

cloc代码统计

先看一下seata项目cloc代码统计(截止到2020-07-20)

cloc-seata

Java代码行数大约是 97K

代码质量

单元测试覆盖率50%

cloc-seata

Demo代码

本文讲的Demo代码是seata-samples项目下的seata-samples-dubbo模块,地址如下:

https://github.com/apache/incubator-seata-samples/tree/master/dubbo

解决的核心问题

AT模式的Demo例子给出了一个典型的分布式事务场景:

  • 在一个采购交易中,需要:
  1. 扣减商品库存
  2. 扣减用户账号余额
  3. 生成采购订单
  • 很明显,以上3个步骤必须:要么全部成功,要么全部失败,否则系统的数据会错乱
  • 而现在流行的微服务架构,一般来说,库存,账号余额,订单是3个独立的系统
  • 每个微服务有自己的数据库,相互独立

这里就是分布式事务的场景。

设计图

解决方案

AT模式解决这个问题的思路其实很简单,一句话概括就是:

在分布式事务过程中,记录待修改的数据修改前和修改后的值到undo_log表,万一交易中出现异常,通过这个里的数据做回滚

当然,具体代码实现起来,我相信很多细节远没这么简单。

Demo代码结构

从github上clone最新的代码

git clone git@github.com:apache/incubator-seata-samples.git

阅读Demo代码结构

$ cd seata-samples/dubbo/
$ tree -C -I 'target' .
.
├── README.md
├── pom.xml
├── seata-samples-dubbo.iml
└── src
└── main
├── java
│ └── io
│ └── seata
│ └── samples
│ └── dubbo
│ ├── ApplicationKeeper.java
│ ├── Order.java
│ ├── service
│ │ ├── AccountService.java
│ │ ├── BusinessService.java
│ │ ├── OrderService.java
│ │ ├── StorageService.java
│ │ └── impl
│ │ ├── AccountServiceImpl.java
│ │ ├── BusinessServiceImpl.java
│ │ ├── OrderServiceImpl.java
│ │ └── StorageServiceImpl.java
│ └── starter
│ ├── DubboAccountServiceStarter.java
│ ├── DubboBusinessTester.java
│ ├── DubboOrderServiceStarter.java
│ └── DubboStorageServiceStarter.java
└── resources
├── file.conf
├── jdbc.properties
├── log4j.properties
├── registry.conf
├── spring
│ ├── dubbo-account-service.xml
│ ├── dubbo-business.xml
│ ├── dubbo-order-service.xml
│ └── dubbo-storage-service.xml
└── sql
├── dubbo_biz.sql
└── undo_log.sql

13 directories, 27 files
  • 在io.seata.samples.dubbo.starter包下的4个*Starter类,分别模拟上面所述的4个微服务

    • Account
    • Business
    • Order
    • Storage
  • 4个服务都是标准的dubbo服务,配置文件在seata-samples/dubbo/src/main/resources/spring目录下

  • 运行demo需要把这4个服务都启动起来,Business最后启动

  • 主要的逻辑在io.seata.samples.dubbo.service,4个实现类分别对应4个微服务的业务逻辑

  • 数据库信息的配置文件:src/main/resources/jdbc.properties

时序图

cloc-seata

Ok, 赶紧动手, Make It Happen!

运行Demo

MySQL

建表

执行seata-samples/dubbo/src/main/resources/sql的脚本dubbo_biz.sql和undo_log.sql

mysql> show tables;
+-----------------+
| Tables_in_seata |
+-----------------+
| account_tbl |
| order_tbl |
| storage_tbl |
| undo_log |
+-----------------+
4 rows in set (0.01 sec)

执行完之后,数据库里应该有4个表

修改seata-samples/dubbo/src/main/resources/jdbc.properties文件

根据你MySQL运行的环境修改变量的值

jdbc.account.url=jdbc:mysql://localhost:3306/seata
jdbc.account.username=your_username
jdbc.account.password=your_password
jdbc.account.driver=com.mysql.jdbc.Driver
# storage db config
jdbc.storage.url=jdbc:mysql://localhost:3306/seata
jdbc.storage.username=your_username
jdbc.storage.password=your_password
jdbc.storage.driver=com.mysql.jdbc.Driver
# order db config
jdbc.order.url=jdbc:mysql://localhost:3306/seata
jdbc.order.username=your_username
jdbc.order.password=your_password
jdbc.order.driver=com.mysql.jdbc.Driver

ZooKeeper

启动ZooKeeper,我的本地的Mac是使用Homebrew安装启动的

$ brew services start zookeeper 
==> Successfully started `zookeeper` (label: homebrew.m

$ brew services list
Name Status User Plist
docker-machine stopped
elasticsearch stopped
kafka stopped
kibana stopped
mysql started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.mysql.plist
nginx stopped
postgresql stopped
redis stopped
zookeeper started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.zookeeper.plist

启动TC事务协调器

在这个链接里页面中,下载对应版本的seata-server程序,我本地下载的是1.2.0版本

  1. 进入文件所在目录并解压文件
  2. 进入seata目录
  3. 执行启动脚本
$ tar -zxvf seata-server-1.2.0.tar.gz
$ cd seata
$ bin/seata-server.sh

观察启动日志是否有报错信息,如果一切正常,并看到了以下的Server started的信息,说明启动成功了。

2020-07-23 13:45:13.810 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...

IDE中启动模拟的微服务

  1. 首先要把seata-samples项目导入到本地IDE中,这里我用的是IntelliJ IDEA
  2. 刷新Maven的工程依赖
  3. 先启动Account,Order,Storage这个3个服务,然后Business才能去调用,对应的启动类分别是:
io.seata.samples.dubbo.starter.DubboStorageServiceStarter
io.seata.samples.dubbo.starter.DubboOrderServiceStarter
io.seata.samples.dubbo.starter.DubboStorageServiceStarter

每个服务启动完之后,看到这句提示信息,说明服务启动成功了

Application is keep running ...

cloc-seata

启动成功后,account_tbl,storage_tbl表会有两条初始化的数据,分别是账户余额和商品库存

mysql> SELECT * FROM account_tbl; SELECT * FROM storage_tbl;
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+
| 1 | U100001 | 999 |
+----+---------+-------+
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+
| 1 | C00321 | 100 |
+----+----------------+-------+
1 row in set (0.00 sec)

使用Business验证效果

正常情况

还是在IDE中执行DubboBusinessTester类的主函数,程序跑完会自动退出

在程序一切正常的情况下,每个微服务的事物都应该是提交了的,数据保持一致

我们来看一下MySQL中数据的变化

mysql> SELECT * FROM account_tbl; SELECT * FROM order_tbl; SELECT * FROM storage_tbl;
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+
| 1 | U100001 | 599 |
+----+---------+-------+
1 row in set (0.00 sec)

+----+---------+----------------+-------+-------+
| id | user_id | commodity_code | count | money |
+----+---------+----------------+-------+-------+
| 1 | U100001 | C00321 | 2 | 400 |
+----+---------+----------------+-------+-------+
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+
| 1 | C00321 | 98 |
+----+----------------+-------+
1 row in set (0.00 sec)

从3个表的数据可以看到:账户余额扣减了400块;订单表增加了1条记录;商品库存扣减了2个

这个结果是程序的逻辑是一致的,说明事务没有问题

异常情况

其实即使不加入分布式事务的控制,一切都正常情况下,事务本身就不会有问题的

所以我们来重点关注,当程序出现异常时的情况

现在我把BusinessServiceImpl的抛异常的代码注释放开,然后再执行一次DubboBusinessTester,来看看有什么情况发生

		@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);

//放开这句抛异常的注释,模拟程序出现异常
throw new RuntimeException("portman's foooooobar error.");

}

接着,我再一次执行DubboBusinessTester,执行过程中在控制台可以看到异常报错信息

Exception in thread "main" java.lang.RuntimeException: portman's foooooobar error.

现在我们再看一下MySQL里的数据变化,发现数据没有任何变化,说明分布式事务的控制已经起作用了

待思考问题

上面的步骤只是演示了seata最简单的demo程序,更多更复杂的情况后续大家可以一起讨论和验证

学习过程中还有一些问题和疑惑,后续进一步学习

  • 全局锁对性能的影响程度
  • undo_log日志可以回滚到原来状态,但是如果数据状态已经发生变化如何处理(比如增加的用户积分已经被别的本地事务花掉了)

参考文献

作者信息

许晓加,金蝶软件架构师

Github

· 阅读需 12 分钟

RPC 模块是我最初研究 Seata 源码开始的地方,因此我对 Seata 的 RPC 模块有过一些深刻研究,在我研究了一番后,发现 RPC 模块中的代码需要进行优化,使得代码更加优雅,交互逻辑更加清晰易懂,本着 “让天下没有难懂的 RPC 通信代码” 的初衷,我开始了 RPC 模块的重构之路。

这里建议想要深入了解 Seata 交互细节的,不妨从 RPC 模块的源码入手,RPC 模块相当于 Seata 的中枢,Seata 所有的交互逻辑在 RPC 模块中表现得淋漓尽致。

这次 RPC 模块的重构将会使得 Seata 的中枢变得更加健壮和易于解读。

重构继承关系

在 Seata 的旧版本中,RPC 模块的整体结构有点混乱,尤其是在各个类的继承关系上,主要体现在:

  1. 直接在 Remoting 类继承 Netty Handler,使得 Remoting 类与 Netty Handler 处理逻辑耦合在一起;
  2. 客户端和服务端的 Reomting 类继承关系不统一;
  3. RemotingClient 被 RpcClientBootstrap 实现,而 RemotingServer 却被 RpcServer 实现,没有一个独立的 ServerBootstrap,这个看起来关系非常混乱;
  4. 有些接口没必要抽取出来,比如 ClientMessageSender、ClientMessageListener、ServerMessageSender 等接口,因这些接口会增加整体结构继承关系的复杂性。

针对上面发现的问题,在重构过程中我大致做了如下事情:

  1. 将 Netty Handler 抽象成一个内部类放在 Remoting 类中;
  2. 将 RemotingClient 为客户端顶级接口,定义客户端与服务端交互的基本方法,抽象一层 AbstractNettyRemotingClient,下面分别有 RmNettyRemotingClient、TmNettyRemotingClient;将 RemotingServer 为服务端顶级接口,定义服务端与客户端交互的基本方法,实现类 NettyRemotingServer;
  3. 同时将 ClientMessageSender、ClientMessageListener、ServerMessageSender 等接口方法归入到 RemotingClient、RemotingServer 中,由 Reomting 类实现 RemotingClient、RemotingServer,统一 Remoting 类继承关系;
  4. 新建 RemotingBootstrap 接口,客户端和服务端分别实现 NettyClientBootstrap、NettyServerBootstrap,将引导类逻辑从 Reomting 类抽离出来。

在最新的 RPC 模块中的继承关系简单清晰,用如下类关系图表示:

  1. AbstractNettyRemoting:Remoting 类的最顶层抽象,包含了客户端和服务端公用的成员变量与公用方法,拥有通用的请求方法(文章后面会讲到),Processor 处理器调用逻辑(文章后面会讲到);
  2. RemotingClient:客户端最顶级接口,定义客户端与服务端交互的基本方法;
  3. RemotingServer:服务端最顶级接口,定义服务端与客户端交互的基本方法;
  4. AbstractNettyRemotingClient:客户端抽象类,继承 AbstractNettyRemoting 类并实现了 RemotingClient 接口;
  5. NettyRemotingServer:服务端实现类,继承 AbstractNettyRemoting 类并实现了 RemotingServer 接口;
  6. RmNettyRemotingClient:Rm 客户端实现类,继承 AbstractNettyRemotingClient 类;
  7. TmNettyRemotingClient:Tm 客户端实现类,继承 AbstractNettyRemotingClient 类。

同时将客户端和服务端的引导类逻辑抽象出来,如下类关系图表示:

  1. RemotingBootstrap:引导类接口,有 start 和 stop 两个抽象方法;
  2. NettyClientBootstrap:客户端引导实现类;
  3. NettyServerBootstrap:服务端引导实现类。

解耦处理逻辑

解耦处理逻辑即是将 RPC 交互的处理逻辑从 Netty Handler 中抽离出来,并将处理逻辑抽象成一个个 Processor,为什么要这么做呢?我大致讲下现在存在的一些问题:

  1. Netty Handler 与 处理逻辑是糅合在一起的,由于客户端与服务端都共用了一套处理逻辑,因此为了兼容更多的交互,在处理逻辑中你可以看到非常多难以理解的判断逻辑;
  2. 在 Seata 的交互中有些请求是异步处理的,也有一些请求是同步处理的,但是在旧的处理代码逻辑中对同步异步处理的表达非常隐晦,而且难以看明白;
  3. 无法从代码逻辑当中清晰地表达出请求消息类型与对应的处理逻辑关系;
  4. 在 Seata 后面的更新迭代中,如果不将处理处理逻辑抽离出来,这部分代码想要增加新的交互逻辑,将会非常困难。

在将处理逻辑从 Netty Handler 进行抽离之前,我们先梳理一下 Seata 现有的交互逻辑:

  • RM 客户端请求服务端的交互逻辑:

  • TM 客户端请求服务端的交互逻辑:

  • 服务端请求 RM 客户端的交互逻辑:

从以上的交互图中可以清晰地看到了 Seata 的交互逻辑。

客户端总共接收服务端的消息:

1)服务端请求消息

  1. BranchCommitRequest、BranchRollbackRequest、UndoLogDeleteRequest

2)服务端响应消息

  1. RegisterRMResponse、BranchRegisterResponse、BranchReportResponse、GlobalLockQueryResponse

RegisterTMResponse、GlobalBeginResponse、GlobalCommitResponse、GlobalRollbackResponse、GlobalStatusResponse、GlobalReportResponse 3. HeartbeatMessage(PONG)

服务端总共接收客户端的消息:

1)客户端请求消息:

  1. RegisterRMRequest、BranchRegisterRequest、BranchReportRequest、GlobalLockQueryRequest

RegisterTMRequest、GlobalBeginRequest、GlobalCommitRequest、GlobalRollbackRequest、GlobalStatusRequest、GlobalReportRequest 3. HeartbeatMessage(PING)

2)客户端响应消息:

  1. BranchCommitResponse、BranchRollbackResponse

基于以上的交互逻辑分析,我们可以将处理消息的逻辑抽象成若干个 Processor,一个 Processor 可以处理一个或者多个消息类型的消息,只需在 Seata 启动时注册将消息类型注册到 ProcessorTable 中即可,形成一个映射关系,这样就可以根据消息类型调用对应的 Processor 对消息进行处理,用如下图表示:

在抽象 Remoting 类中定一个 processMessage 方法,方法逻辑是根据消息类型从 ProcessorTable 中拿到消息类型对应的 Processor。

这样就成功将处理逻辑从 Netty Handler 中彻底抽离出来了,Handler#channelRead 方法只需要调用 processMessage 方法即可,且还可以灵活根据消息类型动态注册 Processor 到 ProcessorTable 中,处理逻辑的可扩展性得到了极大的提升。

以下是 Processor 的调用流程:

1)客户端

  1. RmBranchCommitProcessor:处理服务端全局提交请求;
  2. RmBranchRollbackProcessor:处理服务端全局回滚请求;
  3. RmUndoLogProcessor:处理服务端 undo log 删除请求;
  4. ClientOnResponseProcessor:客户端处理服务端响应请求,如:BranchRegisterResponse、GlobalBeginResponse、GlobalCommitResponse 等;
  5. ClientHeartbeatProcessor:处理服务端心跳响应。

2)服务端

  1. RegRmProcessor:处理 RM 客户端注册请求;
  2. RegTmProcessor:处理 TM 客户端注册请求;
  3. ServerOnRequestProcessor:处理客户端相关请求,如:BranchRegisterRequest、GlobalBeginRequest、GlobalLockQueryRequest 等;
  4. ServerOnResponseProcessor:处理客户端相关响应,如:BranchCommitResponse、BranchRollbackResponse 等;
  5. ServerHeartbeatProcessor:处理客户端心跳响应。

下面我以 TM 发起全局事务提交请求为例子,让大家感受下 Processor 在整个交互中所处的位置:

重构请求方法

在 Seata 的旧版本当中,RPC 的请求方法也是欠缺优雅,主要体现在:

  1. 请求方法过于杂乱无章,没有层次感;
  2. sendAsyncRequest 方法耦合的代码太多,逻辑过于混乱,客户端与服务端都共用了一套请求逻辑,方法中决定是否批量发送是根据参数 address 是否为 null 决定,决定是否同步请求是根据 timeout 是否大于 0 决定,显得极为不合理,且批量请求只有客户端有用到,服务端并没有批量请求,共用一套请求逻辑还会导致服务端异步请求也会创建 MessageFuture 放入 futures 中;
  3. 请求方法名称风格不统一,比如客户端 sendMsgWithResponse,服务端却叫 sendSyncRequest;

针对以上旧版本 RPC 请求方法的各种缺点,我作了以下改动:

  1. 将请求方法统一放入 RemotingClient、RemotingServer 接口当中,并作为顶级接口;
  2. 分离客户端与服务端请求逻辑,将批量请求逻辑单独抽到客户端相关请求方法中,使得是否批量发送不再根据参数 address 是否为 null 决定;
  3. 由于 Seata 自身的逻辑特点,客户端服务端请求方法的参数无法统一,可通过抽取通用的同步/异步请求方法,客户端和服务端根据自身请求逻辑特点实现自身的同步/异步请求逻辑,最后再调用通用的同步/异步请求方法,使得同步/异步请求都有明确的方法,不再根据 timeout 是否大于 0 决定;
  4. 统一请求名称风格。

最终,Seata RPC 的请求方法终于看起来更加优雅且有层次感了。

同步请求:

异步请求:

其它

  1. 类目录调整:RPC 模块目录中还有一个 netty 目录,也可以从目录结构中发现 Seata 的初衷是兼容多个 RPC 框架,目前只实现了 netty,但发现 netty 模块中有些类并不 ”netty“,且 RPC 跟目录的类也并不通用,因此需要将相关类的位置进行调整;
  2. 某些类重新命名,比如 netty 相关类包含 「netty」;

最终 RPC 模块看起来是这样的:

作者简介

张乘辉,目前就职于蚂蚁集团,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 19 分钟

【分布式事务Seata源码解读一】Server端启动流程

实现分布式事务的核心要点:

  1. 事务的持久化,事务所处的各种状态事务参与方的各种状态都需要持久化,当实例宕机时才能基于持久化的数据对事务回滚或提交,实现最终一致性
  2. 定时对超时未完成事务的处理(继续尝试提交或回滚),即通过重试机制实现事务的最终一致性
  3. 分布式事务的跨服务实例传播,当分布式事务跨多个实例时需要实现事务的传播,一般需要适配不同的rpc框架
  4. 事务的隔离级别:大多数分布式事务为了性能,默认的隔离级别是读未提交
  5. 幂等性:对于XA或者seata的AT这样的分布式事务来说,都已经默认实现了幂等性,而TCC、Saga这种接口级别实现的分布式事务都还需要业务开发者自己实现幂等性。

本片文章主要从seata-server的启动流程的角度介绍一下seata-server的源码,启动流程图如下:

在这里插入图片描述

1. 启动类Server

seata-server的入口类在Server类中,源码如下:

public static void main(String[] args) throws IOException {
// 从环境变量或运行时参数中获取监听端口,默认端口8091
int port = PortHelper.getPort(args);

// 把监听端口设置到SystemProperty中,Logback的LoggerContextListener实现类
// SystemPropertyLoggerContextListener会把Port写入到Logback的Context中,
// 在logback.xml文件中会使用Port变量来构建日志文件名称。
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// 创建Logger
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container.");
}

// 解析启动以及配置文件的各种配置参数
ParameterParser parameterParser = new ParameterParser(args);

// metrics相关,这里是使用SPI机制获取Registry实例对象
MetricsManager.get().init();

// 把从配置文件中读取到的storeMode写入SystemProperty中,方便其他类使用。
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

// 创建NettyRemotingServer实例,NettyRemotingServer是一个基于Netty实现的Rpc框架,
// 此时并没有初始化,NettyRemotingServer负责与客户端SDK中的TM、RM进行网络通信。
nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);

// 设置监听端口
nettyRemotingServer.setListenPort(parameterParser.getPort());

// UUIDGenerator初始化,UUIDGenerator基于雪花算法实现,
// 用于生成全局事务、分支事务的id。
// 多个Server实例配置不同的ServerNode,保证id的唯一性
UUIDGenerator.init(parameterParser.getServerNode());

// SessionHodler负责事务日志(状态)的持久化存储,
// 当前支持file、db、redis三种存储模式,集群部署模式要使用db或redis模式
SessionHolder.init(parameterParser.getStoreMode());

// 创建初始化DefaultCoordinator实例,DefaultCoordinator是TC的核心事务逻辑处理类,
// 底层包含了AT、TCC、SAGA等不同事务类型的逻辑处理。
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

// 127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());

try {
// 初始化Netty,开始监听端口并阻塞在这里,等待程序关闭
nettyRemotingServer.init();
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1);
}

System.exit(0);
}

2. 解析配置

参数解析的实现代码在ParameterParser类中,init方法源码如下:

private void init(String[] args) {
try {
// 判断是否运行在容器中,如果运行在容器中则配置从环境变量中获取
if (ContainerHelper.isRunningInContainer()) {
this.seataEnv = ContainerHelper.getEnv();
this.host = ContainerHelper.getHost();
this.port = ContainerHelper.getPort();
this.serverNode = ContainerHelper.getServerNode();
this.storeMode = ContainerHelper.getStoreMode();
} else {
// 基于JCommander获取启动应用程序时配置的参数,
// JCommander通过注解、反射的方式把参数赋值到当前类的字段上。
JCommander jCommander = JCommander.newBuilder().addObject(this).build();
jCommander.parse(args);
if (help) {
jCommander.setProgramName(PROGRAM_NAME);
jCommander.usage();
System.exit(0);
}
}
// serverNode用于雪花算中的实例的唯一标识,需要保证唯一。
// 如果没有指定基于当前服务器的I随机生成一个
if (this.serverNode == null) {
this.serverNode = IdWorker.initWorkerId();
}
if (StringUtils.isNotBlank(seataEnv)) {
System.setProperty(ENV_PROPERTY_KEY, seataEnv);
}
if (StringUtils.isBlank(storeMode)) {
// 这里牵扯到一个重要的Configuration类,ParameterParser只负责获取ip、port、storeMode等核心参数,
// 其他的参数都是从Configuration中获取的。这里如果没有启动参数没有指定storeMode,
// 就从Configuration类中获取。
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}
} catch (ParameterException e) {
printError(e);
}

}

在ParameterParser的init方法中第一次调用了ConfigurationFactory.getInstance(),初始化了一个单例的Configuration对象,Configuration负责初始化所有的其他配置参数信息。从Seata Server端的源码中我们能看到两个配置文件file.conf、registry.conf。那么这两个配置文件的区别是什么,两个文件都是必须的吗?我们继续看代码。

ConfigurationFactory.getInstance方法其实就是获取一个单例对象,核心在buildConfiguration方法中,不过在buidlConfiguration方法前,ConfigurationFactory类有一段static代码块会先执行。

// 获取Configuration的单例对象
public static Configuration getInstance() {
if (instance == null) {
synchronized (Configuration.class) {
if (instance == null) {
instance = buildConfiguration();
}
}
}
return instance;
}

// ConfigurationFactory的static代码块
static {
// 获取配置文件的名称,默认为registry.conf
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_PREFIX;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}

// 读取registry.conf文件的配置,构建基础的Configuration对象
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX,
false) : new FileConfiguration(seataConfigName + "-" + envValue + REGISTRY_CONF_SUFFIX, false);
Configuration extConfiguration = null;
try {
// ExtConfigurationProvider当前只有一个SpringBootConfigurationProvider实现类
// 用于支持客户端SDK SpringBoot的配置文件方式,对于Server端来说这段逻辑可以忽略。
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

ConfigurationFactory中的static代码块是从registry.conf中读取配置信息。registry.conf中主有两个配置信息,注册中心配置源配置源用来指定其他更详细的配置项是file.conf或者是apollo等其他配置源。所以registry.conf配置文件时必须的,registry.conf配置文件中指定其他详细配置的配置源,当前配置源支持file、zk、apollo、nacos、etcd3等。所以file.conf不是必须的,只有当设置配置源为file类型时才会读取file.conf文件中的内容。

接下来ConfigurationFactory中的buildConfiguration就是根据registry.conf中设置的配置源来加载更多的配置项。

private static Configuration buildConfiguration() {
ConfigType configType;
String configTypeName;
try {
// 从registry.conf配置文件中读取config.type字段值,并解析为枚举ConfigType
configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);

if (StringUtils.isBlank(configTypeName)) {
throw new NotSupportYetException("config type can not be null");
}

configType = ConfigType.getType(configTypeName);
} catch (Exception e) {
throw e;
}
Configuration extConfiguration = null;
Configuration configuration;
if (ConfigType.File == configType) {
// 如果配置文件为file类型,则从registry.conf中读取config.file.name配置项,
// 即file类型配置文件的路径,示例中默认为file.conf
String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);

// 根据file配置文件的路径构建FileConfuguration对象
configuration = new FileConfiguration(name);
try {
// configuration的额外扩展,也是只对客户端SpringBoot的SDK才生效
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null
? configuration.getClass().getSimpleName() : extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
// 如果配置文件的类型不是file,如:nacos、zk等,
// 则通过SPI的方式生成对应的ConfigurationProvider对象
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}
try {
// ConfigurationCache是对Configuration做了一次层代理内存缓存,提升获取配置的性能
Configuration configurationCache;
if (null != extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}
if (null != configurationCache) {
extConfiguration = configurationCache;
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e);
}
return null == extConfiguration ? configuration : extConfiguration;
}

3. 初始化UUIDGenerator

UUIDGenertor初始化接收一个serverNode参数,UUIDGenertor当前是使用了雪花算法来生成唯一Id,该serverNode用来保证多个seata-server实例生成的唯一id不重复。

public class UUIDGenerator {

/**
* Generate uuid long.
*
* @return the long
*/
public static long generateUUID() {
return IdWorker.getInstance().nextId();
}

/**
* Init.
*
* @param serverNode the server node id
*/
public static void init(Long serverNode) {
IdWorker.init(serverNode);
}
}

UUIDGenerator是对IdWorker做了封装,唯一id的核心实现逻辑在IdWoker类中,IdWorker是一个雪花算法实现的。此处的IdWorker又是一个单例

public class IdWorker
/**
* Constructor
*
* @param workerId就是上面提到的ServerNode, 取值范围在0·1023,也就是在64位的UUID中占10位
*/
public IdWorker(long workerId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
this.workerId = workerId;
}

/**
* Get the next ID (the method is thread-safe)
*
* @return SnowflakeId
*/
public long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format(
"clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

synchronized (this) {
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
}
//雪花算法64位唯一id组成:第一位0 + 41位时间戳 + 10位workerId + 12位自增序列化(同一时间戳内自增)
return ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
}

4. SessionHolder初始化

SessionHolder负责Session的持久化,一个Session对象对应一个事务,事务分为两种:全局事务(GlobalSession)和分支事务(BranchSession)。 SessionHolder支持file和db两种持久化方式,其中db支持集群模式,推荐使用db。SessionHolder中最主要的四个字段如下:

// ROOT_SESSION_MANAGER用于获取所有的Setssion,以及Session的创建、更新、删除等。
private static SessionManager ROOT_SESSION_MANAGER;
// 用于获取、更新所有的异步commit的Session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于获取、更新所有需要重试commit的Session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于获取、更新所有需要重试rollback的Session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

SessionHolder的init方法

public static void init(String mode) throws IOException {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
// 这里又用到了SPI的方式加载SessionManager,
// 其实下面获取的四个SessionManager实例都是同一个类DataBaseSessionManager的不同实例,
// 只是给DataBaseSessionManager的构造函数传参不同。
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
} else if (StoreMode.FILE.equals(storeMode)) {
//file模式可以先不关心
...
} else {
throw new IllegalArgumentException("unknown store mode:" + mode);
}
// reload方法对于db模式可以忽略
reload();
}

上面看到SessionHolder中的四个SessionManager本质都是类DataBaseSessionManager的实例,只是给构造函数传参不同,看下DataBaseSessionManager的定义:

public DataBaseSessionManager(String name) {
super();
this.taskName = name;
}

// 根据实例的taskName来决定allSessions返回的事务列表,
// 如taskName等于ASYNC_COMMITTING_SESSION_MANAGER_NAME的
// 就返回所有状态为AsyncCommitting的事务。
public Collection<GlobalSession> allSessions() {
// get by taskName
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying}));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
} else {
// taskName为null,则对应ROOT_SESSION_MANAGER,即获取所有状态的事务
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying,
GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying,
GlobalStatus.AsyncCommitting}));
}
}

5. 初始化DefaultCoordinator

DefaultCoordinator是事务协调器的核心,如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。

public DefaultCoordinator(ServerMessageSender messageSender) {
// 接口messageSender的实现类就是上文提到的RpcServer
this.messageSender = messageSender;

// DefaultCore封装了AT、TCC、Saga等分布式事务模式的具体实现类
this.core = new DefaultCore(messageSender);
}

// init方法初始化了5个定时器,主要用于分布式事务的重试机制,
// 因为分布式环境的不稳定性会造成事务处于中间状态,
// 所以要通过不断的重试机制来实现事务的最终一致性。
// 下面的定时器除了undoLogDelete之外,其他的定时任务默认都是1秒执行一次。
public void init() {
// 处理处于回滚状态可重试的事务
retryRollbacking.scheduleAtFixedRate(() -> {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 处理二阶段可以重试提交的状态可重试的事务
retryCommitting.scheduleAtFixedRate(() -> {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 处理异步提交的事务
asyncCommitting.scheduleAtFixedRate(() -> {
try {
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 检查事务的第一阶段已经超时的事务,设置事务状态为TimeoutRollbacking,
// 该事务会由其他定时任务执行回滚操作
timeoutCheck.scheduleAtFixedRate(() -> {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 根据unlog的保存天数调用RM删除unlog
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

6. 初始化NettyRemotingServer

NettyRemotingServer是基于Netty实现的简化版的Rpc服务端,NettyRemotingServer初始化时主要做了两件事:

  1. registerProcessor:注册与Client通信的Processor。
  2. super.init():super.init()方法中负责初始化Netty,并把当前实例的IP端口注册到注册中心中
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}

private void registerProcessor() {
// 1. 注册核心的ServerOnRequestProcessor,即与事务处理相关的Processor,
// 如:全局事务开始、提交,分支事务注册、反馈当前状态等。
// ServerOnRequestProcessor的构造函数中传入getHandler()返回的示例,这个handler
// 就是前面提到的DefaultCoordinator,DefaultCoordinator是分布式事务的核心处理类
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

// 2.注册ResponseProcessor,ResponseProcessor用于处理当Server端主动发起请求时,
// Client端回复的消息,即Response。如:Server向Client端发送分支事务提交或者回滚的请求时,
// Client返回提交/回滚的结果
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

// 3. Client端发起RM注册请求时对应的Processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

// 4. Client端发起TM注册请求时对应的Processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

// 5. Client端发送心跳请求时对应的Processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

在NettyRemotingServer中有调用基类AbstractNettyRemotingServer的init方法,代码如下:

public void init() {
// super.init()方法中启动了一个定时清理超时Rpc请求的定时任务,3S执行一次。
super.init();
// 配置Netty Server端,开始监听端口。
serverBootstrap.start();
}

// serverBootstrap.start();
public void start() {
// Netty server端的常规配置,其中添加了两个ChannelHandler:
// ProtocolV1Decoder、ProtocolV1Encoder,
// 分别对应Seata自定义RPC协议的解码器和编码器
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}

}
});

try {
// 开始监听配置的端口
ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
LOGGER.info("Server started, listen port: {}", listenPort);
// Netty启动成功之后把当前实例注册到registry.conf配置文件配置的注册中心上
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
future.channel().closeFuture().sync();
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

· 阅读需 21 分钟

作者简介:煊檍,GitHub ID:sharajava,阿里巴巴中件间 GTS 研发团队负责人,SEATA 开源项目发起人,曾在 Oracle 北京研发中心多年,从事 WebLogic 核心研发工作。长期专注于中间件,尤其是分布式事务领域的技术实践。

Seata 1.2.0 版本重磅发布新的事务模式:XA 模式,实现对 XA 协议的支持。

这里,我们从三个方面来深入解读这个新的特性:

  • 是什么(What):XA 模式是什么?
  • 为什么(Why):为什么支持 XA?
  • 怎么做(How):XA 模式是如何实现的,以及怎样使用?

1. XA 模式是什么?

这里有两个基本的前置概念:

  1. 什么是 XA?
  2. 什么是 Seata 定义的所谓 事务模式?

基于这两点,再来理解 XA 模式就很自然了。

1.1 什么是 XA?

XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准。

XA 规范 描述了全局的事务管理器与局部的资源管理器之间的接口。 XA规范 的目的是允许的多个资源(如数据库,应用服务器,消息队列等)在同一事务中访问,这样可以使 ACID 属性跨越应用程序而保持有效。

XA 规范 使用两阶段提交(2PC,Two-Phase Commit)来保证所有资源同时提交或回滚任何特定的事务。

XA 规范 在上世纪 90 年代初就被提出。目前,几乎所有主流的数据库都对 XA 规范 提供了支持。

1.2 什么是 Seata 的事务模式?

Seata 定义了全局事务的框架。

全局事务 定义为若干 分支事务 的整体协调:

  1. TM 向 TC 请求发起(Begin)、提交(Commit)、回滚(Rollback)全局事务。
  2. TM 把代表全局事务的 XID 绑定到分支事务上。
  3. RM 向 TC 注册,把分支事务关联到 XID 代表的全局事务中。
  4. RM 把分支事务的执行结果上报给 TC。(可选)
  5. TC 发送分支提交(Branch Commit)或分支回滚(Branch Rollback)命令给 RM。
seata-mod

Seata 的 全局事务 处理过程,分为两个阶段:

  • 执行阶段 :执行 分支事务,并 保证 执行结果满足是 可回滚的(Rollbackable)持久化的(Durable)
  • 完成阶段: 根据 执行阶段 结果形成的决议,应用通过 TM 发出的全局提交或回滚的请求给 TC,TC 命令 RM 驱动 分支事务 进行 Commit 或 Rollback。

Seata 的所谓 事务模式 是指:运行在 Seata 全局事务框架下的 分支事务 的行为模式。准确地讲,应该叫作 分支事务模式。

不同的 事务模式 区别在于 分支事务 使用不同的方式达到全局事务两个阶段的目标。即,回答以下两个问题:

  • 执行阶段 :如何执行并 保证 执行结果满足是 可回滚的(Rollbackable)持久化的(Durable)
  • 完成阶段: 收到 TC 的命令后,如何做到分支的提交或回滚?

以我们 Seata 的 AT 模式和 TCC 模式为例来理解:

AT 模式

at-mod
  • 执行阶段:

    • 可回滚:根据 SQL 解析结果,记录回滚日志
    • 持久化:回滚日志和业务 SQL 在同一个本地事务中提交到数据库
  • 完成阶段:

    • 分支提交:异步删除回滚日志记录
    • 分支回滚:依据回滚日志进行反向补偿更新

TCC 模式

tcc-mod
  • 执行阶段:

    • 调用业务定义的 Try 方法(完全由业务层面保证 可回滚持久化
  • 完成阶段:

    • 分支提交:调用各事务分支定义的 Confirm 方法
    • 分支回滚:调用各事务分支定义的 Cancel 方法

1.3 什么是 Seata 的 XA 模式?

XA 模式:

在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。

xa-mod
  • 执行阶段:

    • 可回滚:业务 SQL 操作放在 XA 分支中进行,由资源对 XA 协议的支持来保证 可回滚
    • 持久化:XA 分支完成后,执行 XA prepare,同样,由资源对 XA 协议的支持来保证 持久化(即,之后任何意外都不会造成无法回滚的情况)
  • 完成阶段:

    • 分支提交:执行 XA 分支的 commit
    • 分支回滚:执行 XA 分支的 rollback

2. 为什么支持 XA?

为什么要在 Seata 中增加 XA 模式呢?支持 XA 的意义在哪里呢?

2.1 补偿型事务模式的问题

本质上,Seata 已经支持的 3 大事务模式:AT、TCC、Saga 都是 补偿型 的。

补偿型 事务处理机制构建在 事务资源 之上(要么在中间件层面,要么在应用层面),事务资源 本身对分布式事务是无感知的。

img

事务资源 对分布式事务的无感知存在一个根本性的问题:无法做到真正的 全局一致性 。

比如,一条库存记录,处在 补偿型 事务处理过程中,由 100 扣减为 50。此时,仓库管理员连接数据库,查询统计库存,就看到当前的 50。之后,事务因为异外回滚,库存会被补偿回滚为 100。显然,仓库管理员查询统计到的 50 就是 脏 数据。

可以看到,补偿型 分布式事务机制因为不要求 事务资源 本身(如数据库)的机制参与,所以无法保证从事务框架之外的全局视角的数据一致性。

2.2 XA 的价值

与 补偿型 不同,XA 协议 要求 事务资源 本身提供对规范和协议的支持。

nct

因为 事务资源 感知并参与分布式事务处理过程,所以 事务资源(如数据库)可以保障从任意视角对数据的访问有效隔离,满足全局数据一致性。

比如,上一节提到的库存更新场景,XA 事务处理过程中,中间态数据库存 50 由数据库本身保证,是不会仓库管理员的查询统计 到的。(当然隔离级别需要 读已提交 以上)

除了 全局一致性 这个根本性的价值外,支持 XA 还有如下几个方面的好处:

  1. 业务无侵入:和 AT 一样,XA 模式将是业务无侵入的,不给应用设计和开发带来额外负担。
  2. 数据库的支持广泛:XA 协议被主流关系型数据库广泛支持,不需要额外的适配即可使用。
  3. 多语言支持容易:因为不涉及 SQL 解析,XA 模式对 Seata 的 RM 的要求比较少,为不同语言开发 SDK 较之 AT 模式将更 ,更容易。
  4. 传统基于 XA 应用的迁移:传统的,基于 XA 协议的应用,迁移到 Seata 平台,使用 XA 模式将更平滑。

2.3 XA 广泛被质疑的问题

不存在某一种分布式事务机制可以完美适应所有场景,满足所有需求。

XA 规范早在上世纪 90 年代初就被提出,用以解决分布式事务处理这个领域的问题。

现在,无论 AT 模式、TCC 模式还是 Saga 模式,这些模式的提出,本质上都源自 XA 规范对某些场景需求的无法满足。

XA 规范定义的分布式事务处理机制存在一些被广泛质疑的问题,针对这些问题,我们是如何思考的呢?

  1. 数据锁定:数据在整个事务处理过程结束前,都被锁定,读写都按隔离级别的定义约束起来。

思考:

数据锁定是获得更高隔离性和全局一致性所要付出的代价。

补偿型 的事务处理机制,在 执行阶段 即完成分支(本地)事务的提交,(资源层面)不锁定数据。而这是以牺牲 隔离性 为代价的。

另外,AT 模式使用 全局锁 保障基本的 写隔离,实际上也是锁定数据的,只不过锁在 TC 侧集中管理,解锁效率高且没有阻塞的问题。

  1. 协议阻塞:XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待。

思考:

协议的阻塞机制本身并不是问题,关键问题在于 协议阻塞 遇上 数据锁定。

如果一个参与全局事务的资源 “失联” 了(收不到分支事务结束的命令),那么它锁定的数据,将一直被锁定。进而,甚至可能因此产生死锁。

这是 XA 协议的核心痛点,也是 Seata 引入 XA 模式要重点解决的问题。

基本思路是两个方面:避免 “失联” 和 增加 “自解锁” 机制。(这里涉及非常多技术细节,暂时不展开,在后续 XA 模式演进过程中,会专门拿出来讨论)

  1. 性能差:性能的损耗主要来自两个方面:一方面,事务协调过程,增加单个事务的 RT;另一方面,并发事务数据的锁冲突,降低吞吐。

思考:

和不使用分布式事务支持的运行场景比较,性能肯定是下降的,这点毫无疑问。

本质上,事务(无论是本地事务还是分布式事务)机制就是拿部分 性能的牺牲 ,换来 编程模型的简单 。

与同为 业务无侵入 的 AT 模式比较:

首先,因为同样运行在 Seata 定义的分布式事务框架下,XA 模式并没有产生更多事务协调的通信开销。

其次,并发事务间,如果数据存在热点,产生锁冲突,这种情况,在 AT 模式(默认使用全局锁)下同样存在的。

所以,在影响性能的两个主要方面,XA 模式并不比 AT 模式有非常明显的劣势。

AT 模式性能优势主要在于:集中管理全局数据锁,锁的释放不需要 RM 参与,释放锁非常快;另外,全局提交的事务,完成阶段 异步化。

3. XA 模式如何实现以及怎样用?

3.1 XA 模式的设计

3.1.1 设计目标

XA 模式的基本设计目标,两个主要方面:

  1. 从 场景 上,满足 全局一致性 的需求。
  2. 从 应用上,保持与 AT 模式一致的无侵入。
  3. 从 机制 上,适应分布式微服务架构的特点。

整体思路:

  1. 与 AT 模式相同的:以应用程序中 本地事务 的粒度,构建到 XA 模式的 分支事务。
  2. 通过数据源代理,在应用程序本地事务范围外,在框架层面包装 XA 协议的交互机制,把 XA 编程模型 透明化。
  3. 把 XA 的 2PC 拆开,在分支事务 执行阶段 的末尾就进行 XA prepare,把 XA 协议完美融合到 Seata 的事务框架,减少一轮 RPC 交互。

3.1.2 核心设计

1. 整体运行机制

XA 模式 运行在 Seata 定义的事务框架内:

xa-fw
  • 执行阶段(E xecute):

    • XA start/XA end/XA prepare + SQL + 注册分支
  • 完成阶段(F inish):

    • XA commit/XA rollback

2. 数据源代理

XA 模式需要 XAConnection。

获取 XAConnection 两种方式:

  • 方式一:要求开发者配置 XADataSource
  • 方式二:根据开发者的普通 DataSource 来创建

第一种方式,给开发者增加了认知负担,需要为 XA 模式专门去学习和使用 XA 数据源,与 透明化 XA 编程模型的设计目标相违背。

第二种方式,对开发者比较友好,和 AT 模式使用一样,开发者完全不必关心 XA 层面的任何问题,保持本地编程模型即可。

我们优先设计实现第二种方式:数据源代理根据普通数据源中获取的普通 JDBC 连接创建出相应的 XAConnection。

类比 AT 模式的数据源代理机制,如下:

img

但是,第二种方法有局限:无法保证兼容的正确性。

实际上,这种方法是在做数据库驱动程序要做的事情。不同的厂商、不同版本的数据库驱动实现机制是厂商私有的,我们只能保证在充分测试过的驱动程序上是正确的,开发者使用的驱动程序版本差异很可能造成机制的失效。

这点在 Oracle 上体现非常明显。参见 Druid issue:https://github.com/alibaba/druid/issues/3707

综合考虑,XA 模式的数据源代理设计需要同时支持第一种方式:基于 XA 数据源进行代理。

类比 AT 模式的数据源代理机制,如下:

img

3. 分支注册

XA start 需要 Xid 参数。

这个 Xid 需要和 Seata 全局事务的 XID 和 BranchId 关联起来,以便由 TC 驱动 XA 分支的提交或回滚。

目前 Seata 的 BranchId 是在分支注册过程,由 TC 统一生成的,所以 XA 模式分支注册的时机需要在 XA start 之前。

将来一个可能的优化方向:

把分支注册尽量延后。类似 AT 模式在本地事务提交之前才注册分支,避免分支执行失败情况下,没有意义的分支注册。

这个优化方向需要 BranchId 生成机制的变化来配合。BranchId 不通过分支注册过程生成,而是生成后再带着 BranchId 去注册分支。

4. 小结

这里只通过几个重要的核心设计,说明 XA 模式的基本工作机制。

此外,还有包括 连接保持异常处理 等重要方面,有兴趣可以从项目代码中进一步了解。

以后会陆续写出来和大家交流。

3.1.3 演进规划

XA 模式总体的演进规划如下:

  1. 第 1 步(已经完成):首个版本(1.2.0),把 XA 模式原型机制跑通。确保只增加,不修改,不给其他模式引入的新问题。
  2. 第 2 步(计划 5 月完成):与 AT 模式必要的融合、重构。
  3. 第 3 步(计划 7 月完成):完善异常处理机制,进行上生产所必需的打磨。
  4. 第 4 步(计划 8 月完成):性能优化。
  5. 第 5 步(计划 2020 年内完成):结合 Seata 项目正在进行的面向云原生的 Transaction Mesh 设计,打造云原生能力。

3.2 XA 模式的使用

从编程模型上,XA 模式与 AT 模式保持完全一致。

可以参考 Seata 官网的样例:seata-xa

样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。

在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。

    @Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);

// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}

4. 总结

在当前的技术发展阶段,不存一个分布式事务处理机制可以完美满足所有场景的需求。

一致性、可靠性、易用性、性能等诸多方面的系统设计约束,需要用不同的事务处理机制去满足。

Seata 项目最核心的价值在于:构建一个全面解决分布式事务问题的 标准化 平台。

基于 Seata,上层应用架构可以根据实际场景的需求,灵活选择合适的分布式事务解决方案。

img

XA 模式的加入,补齐了 Seata 在 全局一致性 场景下的缺口,形成 AT、TCC、Saga、XA 四大 事务模式 的版图,基本可以满足所有场景的分布式事务处理诉求。

当然 XA 模式和 Seata 项目本身都还不尽完美,有很多需要改进和完善的地方。非常欢迎大家参与到项目的建设中,共同打造一个标准化的分布式事务平台。

· 阅读需 14 分钟

Seata阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

1.1 四种事务模式

Seata 目标打造一站式的分布事务的解决方案,最终会提供四种事务模式:

目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习 Seata 的时候,可以花更多精力在 AT 模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。

友情提示:具体的流行度,朋友可以选择看看 Wanted: who's using Seata 每个公司登记的使用方式。

1.2 三种角色

在 Seata 的架构中,一共有三个角色:

三个角色

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
  • RM ( Resource Manager ) - 资源管理器:管理分支事务处理的资源( Resource ),与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。

在 Seata 中,一个分布式事务的生命周期如下:

架构图

友情提示:看下艿艿添加的红色小勾。

  • TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。

    XID,会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。

  • RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。

  • TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。

  • TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

1.3 框架支持情况

Seata 目前提供了对主流的微服务框架的支持:

同时方便我们集成到 Java 项目当中,Seata 也提供了相应的 Starter 库:

因为 Seata 是基于 DataSource 数据源进行代理来拓展,所以天然对主流的 ORM 框架提供了非常好的支持:

  • MyBatis、MyBatis-Plus
  • JPA、Hibernate

1.4 案例情况

Wanted: who's using Seata 的登记情况,Seata 已经在国内很多团队开始落地,其中不乏有滴滴、韵达等大型公司。可汇总如下图:

汇总图

另外,在 awesome-seata 仓库中,艿艿看到了滴滴等等公司的落地时的技术分享,还是非常真实可靠的。如下图所示:awesome-seata 滴滴

从案例的情况来说,Seata 可能给是目前已知最可靠的分布式事务解决方案,至少对它进行技术投入是非常不错的选择。

2. 部署单机 TC Server

本小节,我们来学习部署单机 Seata TC Server,常用于学习或测试使用,不建议在生产环境中部署单机。

因为 TC 需要进行全局事务和分支事务的记录,所以需要对应的存储。目前,TC 有两种存储模式( store.mode ):

  • file 模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。
  • db 模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。

显然,我们将采用 file 模式,最终我们部署单机 TC Server 如下图所示:单机 TC Server

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

2.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

2.2 启动 TC Server

执行 nohup sh bin/seata-server.sh & 命令,启动 TC Server 在后台。在 nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 File 存储器
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[FILE] extension by class[io.seata.server.store.file.FileTransactionStoreManager]
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[FILE] extension by class[io.seata.server.session.file.FileBasedSessionManager]
# 启动成功
2020-04-02 08:36:01.597 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
  • 默认配置下,Seata TC Server 启动在 8091 端点。

因为我们使用 file 模式,所以可以看到用于持久化的本地文件 root.data。操作命令如下:

$ ls -ls sessionStore/
total 0
0 -rw-r--r-- 1 yunai staff 0 Apr 2 08:36 root.data

后续,朋友可以阅读「4. 接入 Java 应用」小节,开始使用 Seata 实现分布式事务。

3. 部署集群 TC Server

本小节,我们来学习部署集群 Seata TC Server,实现高可用,生产环境下必备。在集群时,多个 Seata TC Server 通过 db 数据库,实现全局事务会话信息的共享。

同时,每个 Seata TC Server 可以注册自己到注册中心上,方便应用从注册中心获得到他们。最终我们部署 集群 TC Server 如下图所示:集群 TC Server

Seata TC Server 对主流的注册中心都提供了集成,具体可见 discovery 目录。考虑到国内使用 Nacos 作为注册中心越来越流行,这里我们就采用它。

友情提示:如果对 Nacos 不了解的朋友,可以参考《Nacos 安装部署》文章。

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

3.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

3.2 初始化数据库

① 使用 mysql.sql 脚本,初始化 Seata TC Server 的 db 数据库。脚本内容如下:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

在 MySQL 中,创建 seata 数据库,并在该库下执行该脚本。最终结果如下图:seata 数据库 - MySQL 5.X

② 修改 conf/file 配置文件,修改使用 db 数据库,实现 Seata TC Server 的全局事务会话信息的共享。如下图所示:conf/file 配置文件

③ MySQL8 的支持

如果朋友使用的 MySQL 是 8.X 版本,则需要看该步骤。否则,可以直接跳过。

首先,需要下载 MySQL 8.X JDBC 驱动,命令行操作如下:

$ cd lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

然后,修改 conf/file 配置文件,使用该 MySQL 8.X JDBC 驱动。如下图所示:seata 数据库 - MySQL 8.X

3.3 设置使用 Nacos 注册中心

修改 conf/registry.conf 配置文件,设置使用 Nacos 注册中心。如下图所示:conf/registry.conf 配置文件

3.4 启动 TC Server

① 执行 nohup sh bin/seata-server.sh -p 18091 -n 1 & 命令,启动第一个 TC Server 在后台。

  • -p:Seata TC Server 监听的端口。
  • -n:Server node。在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突。

nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 DB 存储器
2020-04-05 16:54:12.793 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load DataSourceGenerator[dbcp] extension by class[io.seata.server.store.db.DbcpDataSourceGenerator]
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load LogStore[DB] extension by class[io.seata.core.store.db.LogStoreDataBaseDAO]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[DB] extension by class[io.seata.server.store.db.DatabaseTransactionStoreManager]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[DB] extension by class[io.seata.server.session.db.DataBaseSessionManager]
# 启动成功
2020-04-05 16:54:13.779 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
# 使用 Nacos 注册中心
2020-04-05 16:54:13.788 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load RegistryProvider[Nacos] extension by class[io.seata.discovery.registry.nacos.NacosRegistryProvider]

② 执行 nohup sh bin/seata-server.sh -p 28091 -n 2 & 命令,启动第二个 TC Server 在后台。

③ 打开 Nacos 注册中心的控制台,我们可以看到有两个 Seata TC Server 示例。如下图所示:Nacos 控制台

4. 接入 Java 应用

4.1 AT 模式

① Spring Boot

1、《芋道 Spring Boot 分布式事务 Seata 入门》「2. AT 模式 + 多数据源」小节,实现单体 Spring Boot 项目在多数据源下的分布式事务。

整体图

2、《芋道 Spring Boot 分布式事务 Seata 入门》「AT 模式 + HttpClient 远程调用」小节,实现多个 Spring Boot 项目的分布事务。

整体图

② Dubbo

《Dubbo 分布式事务 Seata 入门》「2. AT 模式」小节,实现多个 Dubbo 服务下的分布式事务。

整体图

③ Spring Cloud

《芋道 Spring Cloud Alibaba 分布式事务 Seata 入门》「3. AT 模式 + Feign」小节,实现多个 Spring Cloud 服务下的分布式事务。

整体图

4.2 TCC 模式

4.3 Saga 模式

4.4 XA 模式

Seata 正在开发中...

· 阅读需 7 分钟

使用配置中心和数据库来实现 Seata 的高可用,以 Nacos 和 MySQL 为例,将cloud-seata-nacos应用部署到 Kubernetes 集群中

该应用使用 Nacos 作为配置和注册中心,总共有三个服务: order-service, pay-service, storage-service, 其中 order-service 对外提供下单接口,当余额和库存充足时,下单成功,会提交事务,当不足时会抛出异常,下单失败,回滚事务

准备工作

需要准备可用的注册中心、配置中心 Nacos 和 MySQL,通常情况下,注册中心、配置中心和数据库都是已有的,不需要特别配置,在这个实践中,为了简单,只部署单机的注册中心、配置中心和数据库,假设他们是可靠的

  • 部署 Nacos

在服务器部署 Nacos,开放 8848 端口,用于 seata-server 注册,服务器地址为 192.168.199.2

docker run --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server
  • 部署 MySQL

部署一台MySQL 数据库,用于保存事务数据,服务器地址为 192.168.199.2

docker run --name mysql -p 30060:3306-e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

部署 seata-server

  • 创建seata-server需要的表

具体的 SQL 参考 script/server/db,这里使用的是 MySQL 的脚本,数据库名称为 seata

同时,也需要创建 undo_log 表, 可以参考 script/client/at/db/

  • 修改seata-server配置

将以下配置添加到 Nacos 配置中心,具体添加方法可以参考 script/config-center

service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true
store.db.user=root
store.db.password=123456

部署 seata-server 到 Kubernetes

  • seata-server.yaml

需要将 ConfigMap 的注册中心和配置中心地址改成相应的地址

apiVersion: v1
kind: Service
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
type: ClusterIP
ports:
- port: 8091
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-server

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
serviceName: seata-ha-server
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-server
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-server
spec:
containers:
- name: seata-ha-server
image: docker.io/seataio/seata-server:latest
imagePullPolicy: IfNotPresent
env:
- name: SEATA_CONFIG_NAME
value: file:/root/seata-config/registry
ports:
- name: http
containerPort: 8091
protocol: TCP
volumeMounts:
- name: seata-config
mountPath: /root/seata-config
volumes:
- name: seata-config
configMap:
name: seata-ha-server-config


---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-server-config
data:
registry.conf: |
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.199.2"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.199.2"
group = "SEATA_GROUP"
}
}
  • 部署
kubectl apply -f seata-server.yaml

部署完成后,会有三个 pod

kubectl get pod | grep seata-ha-server

seata-ha-server-645844b8b6-9qh5j 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-pzczs 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-wkpw8 1/1 Running 0 3m14s

待启动完成后,可以在 Nacos 的服务列表中发现三个 seata-server 的实例,至此,已经完成 seata-server 的高可用部署

  • 查看服务日志
kubelet logs -f seata-ha-server-645844b8b6-9qh5j
[0.012s][info   ][gc] Using Serial
2020-04-15 00:55:09.880 INFO [main]io.seata.server.ParameterParser.init:90 -The server is running in container.
2020-04-15 00:55:10.013 INFO [main]io.seata.config.FileConfiguration.<init>:110 -The configuration file used is file:/root/seata-config/registry.conf
2020-04-15 00:55:12.426 INFO [main]com.alibaba.druid.pool.DruidDataSource.init:947 -{dataSource-1} inited
2020-04-15 00:55:13.127 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started

其中{dataSource-1} 说明使用了数据库,并正常初始化完成

  • 查看注册中心,此时seata-serve 这个服务会有三个实例

seata-ha-nacos-list.png

部署业务服务

  • 创建业务表并初始化数据

具体的业务表可以参考 cloud-seata-nacos/README.md

  • 添加 Nacos 配置

在 public 的命名空间下,分别创建 data-id 为 order-service.properties, pay-service.properties, storage-service.properties 的配置,内容相同,需要修改数据库的地址、用户名和密码

# MySQL
spring.datasource.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Seata
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
  • 部署服务

通过 application.yaml 配置文件部署服务,需要注意的是修改 ConfigMap 的 NACOS_ADDR为自己的 Nacos 地址

apiVersion: v1
kind: Service
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
type: NodePort
ports:
- port: 8081
nodePort: 30081
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-service

---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-service-config
data:
NACOS_ADDR: 192.168.199.2:8848

---
apiVersion: v1
kind: ServiceAccount
metadata:
name: seata-ha-account
namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: seata-ha-account
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: seata-ha-account
namespace: default

---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-service
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-service
spec:
serviceAccountName: seata-ha-account
containers:
- name: seata-ha-order-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-order-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8081
protocol: TCP
- name: seata-ha-pay-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-pay-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8082
protocol: TCP
- name: seata-ha-storage-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-storage-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8083
protocol: TCP

通过以下命令,将应用部署到集群中

kubectl apply -f application.yaml

然后查看创建的 pod,seata-ha-service 这个服务下有三个 pod

kubectl get pod | grep seata-ha-service

seata-ha-service-7dbdc6894b-5r8q4 3/3 Running 0 12m

待应用启动后,在 Nacos 的服务列表中,会有相应的服务

seata-ha-service-list.png

此时查看服务的日志,会看到服务向每一个 TC 都注册了

kubectl logs -f seata-ha-service-7dbdc6894b-5r8q4 seata-ha-order-service

seata-ha-service-register.png

查看任意的 TC 日志,会发现每一个服务都向 TC 注册了

kubelet logs -f seata-ha-server-645844b8b6-9qh5j

seata-ha-tc-register.png

测试

测试成功场景

调用下单接口,将 price 设置为 1,因为初始化的余额为 10,可以下单成功

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 1
}'

此时返回结果为:

{"success":true,"message":null,"data":null}

查看TC 的日志,事务成功提交:

seata-ha-commit-tc-success.png

查看 order-service 服务日志 seata-ha-commit-success.png

测试失败场景

设置 price 为 100,此时余额不足,会下单失败抛出异常,事务会回滚

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 100
}'

查看 TC 的日志: seata-ha-commit-tc-rollback.png

查看服务的日志 : seata-ha-commit-service-rollback.png

多次调用查看服务日志,发现会随机的向其中某台TC发起事务注册,当扩容或缩容后,有相应的 TC 参与或退出,证明高可用部署生效

· 阅读需 8 分钟

一 . 导读

根据大佬定义的分类,配置可以有三种:环境配置、描述配置、扩展配置。

环境配置:像一些组件启动时的参数等,通常是离散的简单值,多是 key-value 型数据。

描述配置:与业务逻辑相关,比如:事务发起方和参与方,通常会嵌到业务的生命周期管理中。描述配置信息较多,甚至有层次关系。

扩展配置:产品需要发现第三方实现,对配置的聚合要求比较高,比如:各种配置中心和注册中心,通常做法是在 jar 包的 META-INF/services 下放置接口类全名文件,内容为每行一个实现类类名。

二. 环境配置

seata server 在加载的时候,会使用 resources/registry.conf 来确定配置中心和注册中心的类型。而 seata client 在 1.0 版本后,不仅能使用 conf 文件进行配置的加载,也可以在 springboot 的 yml 配置文件中,使用 seata.config.{type} 来进行配置中心的选择,注册中心与之类似。通过 yml 加载配置的源码在 io.seata.spring.boot.autoconfigure.properties.registry 包下。

如果 seata 客户端的使用者既在 resources 下放了 conf 配置文件又在 yml 文件中配置,那么会优先使用 yml 中配置的。代码:

CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;

这里 extConfiguration 是外部配置实例,即 ExtConfigurationProvider#provide() 外部配置提供类提供的,而 configuration 是另一个配置提供类提供的 ConfigurationProvider#provide(),这两个配置提供类是在 config 模块 ConfigurationFactory 静态块中,通过 SPI 的方式加载。

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

上面说的是配置中心类型的选择,而配置环境的加载,是在确定了使用什么配置中心类型后,再通过相应的配置中心加载环境配置。File 即文本方式配置也是一种配置中心。

client 和 server 获取配置参数,是通过 ConfigurationFactory#getInstance() 获取配置类实例,再使用配置类实例获取配置参数,配置的 key 这些常量的定义,主要在 core 模块下 config 文件中。

一些重要的环境配置属性的意义,官网都有介绍

在实例化的时候通过 ConfigurationFactory 获取后注入构造函数中的,需要重启才能生效,而在使用时通过 ConfigurationFactory 实时获取的,配置改了就可以生效。

但是 config 模块提供了 ConfigurationChangeListener#onChangeEvent 接口方法来修改实例内部的属性。即在这个方法中,监听动态变化的属性,如果检测到自身使用的属性和刚开始注入时不一样了,就修改实例中保存的属性,和配置中心保持一致,这样就实现了动态配置。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener {
private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,false);
@Override public Object invoke(Param param) {
if(disable){//事务业务处理}
}
@Override public void onChangeEvent(Param param) {
disable = param;
}}

上面是 spring 模块下的 GlobalTransactionalInterceptor 与降级属性相关的伪代码。 GlobalTrarnsactionalScanner 在上面的 interceptor 类被实例化时,把 interceptor 注册到了配置变化监听列表中,当配置被改变的时候,会调用监听器:

ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);

降级的意思是,当服务某一项功能不可用的时候,通过动态配置的属性,把某一项功能给关了,这样就可以避免一直尝试失败的处理。interceptor#invoke() 只有当这个 disable 属性为 true 时,才会执行 seata 事务相关业务。

三. 描述配置

一般性框架描述性配置通常信息比较多,甚至有层次关系,用 xml 配置比较方便,因为树结构描述性更强。而现在的习惯都在提倡去繁琐的约束性配置,采用约定的方式。

seata AT 模式是通过代理数据源的方式来进行事务处理,对业务方入侵较小,只需让 seata 在启动时,识别哪些业务方需要开启全局事务,所以用注解就可以实现描述性配置。

@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
public String doBiz(String msg) {}

如果是 tcc 模式,事务参与方还需使用注解标识:

@TwoPhaseBusinessAction(name = "tccActionForSpringTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int i);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);

四 .扩展配置

扩展配置,通常对产品的聚合要求比较高,因为产品需要发现第三方实现,将其加入产品内部。

在这里插入图片描述 这是一个自定义配置中心提供类的例子,在 META-INF/services 下放置一个接口同名的文本文件,文件的内容为接口的实现类。这是标准的 spi 方式。然后修改配置文件 registry.conf 中的 config.type=test 。

但是如果你认为这样就可以被 seata 识别到,并且替换掉配置中心,那你就错了。seata 在加载配置中心的时候,使用 enum ConfigType 包裹了一下配置文件中配置的配置中心的类型的值:

private static Configuration buildConfiguration() {
configTypeName = "test";//registry.conf中配置的config.type
configType = ConfigType.getType(configTypeName);//ConfigType获取不到会抛异常
}

如果在 ConfigType 中没有定义 test 这种配置中心类型,那么会抛异常。所以单纯的修改配置文件而不改变源码是无法使用 ConfigType 中定义的配置中心提供类以外的配置中心提供类。

目前 1.0 版本在 ConfigType 中定义的配置中心类型有:File,ZK,Nacos,Apollo,Consul,Etcd3,SpringCloudConfig,Custom。如果用户想使用自定义的配置中心类型,可以使用 Custom 这种类型。

在这里插入图片描述 这里可以使用不优雅的方式,即提供一个指定名称 ZK 但是级别 order=3 更高的实现类(ZK 默认 order=1),就可以让 ConfigurationFactory 使用 TestConfigurationProvider 作为配置中心提供类。

通过上面的步骤,就可以让 seata 使用我们自己提供的代码。seata 中 codec、compressor、discovery、integration 等模块,都是使用 spi 机制加载功能类,符合微内核 + 插件化,平等对待第三方的设计思想。

五 . seata 源码分析系列地址

作者:赵润泽,系列地址