FutureTask和DatagramChannel引发的死锁

最近线上服务出现了一次死锁,顺着jstack的log查了查原因。发现是jdk的bug,查看openjdk的bug列表,有类似的问题,官方将在11版本修复。

我下载了官网最新的jdk,8u171 8u172和jdk 10.0.1都能复现。

https://bugs.openjdk.java.net/browse/JDK-8138622

这篇文章涉及到的jdk源码我没有贴上来,因为太长了,需要读者自己查看源码。

0x01 问题复现代码

根据线上出问题的服务的jstack信息分析后,编写问题复现代码。由于是udp通信,所有需要服务端和客户端。客户端采用java编写,服务端为了方便,采用python3编写。

  • 客户端代码 ( java ) :

public class App {



    private static ExecutorService threadPool = new ThreadPoolExecutor(40, 80, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1000));



    public static void main(String[] args) {



        for (int i = 0; i < 200; i++) {

            RunClass task = new RunClass();

            FutureTask futureTask = new FutureTask<>(task);

            threadPool.submit(futureTask);

            try {

                String a = futureTask.get(3, TimeUnit.MILLISECONDS);

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (ExecutionException e) {

                e.printStackTrace();

            } catch (TimeoutException e) {

                futureTask.cancel(true);

            }

        }

        System.out.println("over");

    }

}



class RunClass implements Callable {



    @Override

    public String call() throws Exception {

        go();

        return null;

    }



    private void go() throws IOException {

        DatagramChannel channel = DatagramChannel.open();

        channel.configureBlocking(false);

        channel.connect(new InetSocketAddress("127.0.0.1", 9999));



        byte[] b = "123".getBytes("utf-8");

        System.out.println(b.length);

        ByteBuffer byteBuffers = ByteBuffer.allocate(b.length);

        byteBuffers.put(b);

        Selector selector = Selector.open();

        channel.register(selector, SelectionKey.OP_READ);



        byteBuffers.flip();

        channel.write(byteBuffers);



        while (selector.select() > 0) {

            Iterator iterator = selector.selectedKeys().iterator();

            while (iterator.hasNext()) {

                SelectionKey key = null;

                try {

                    key = (SelectionKey) iterator.next();

                    iterator.remove();

                    if (key.isReadable()) {

                        reveice(key);

                        return;

                    }

                    if (key.isWritable()) {

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                    try {

                        key.cancel();

                        key.channel().close();

                    } catch (ClosedChannelException cex) {

                        e.printStackTrace();

                    }

                }

            }

        }

    }



    private void reveice(SelectionKey key) throws IOException {

        DatagramChannel channel = (DatagramChannel) key.channel();

        ByteBuffer byteBuffers2 = ByteBuffer.allocate(1024);

        channel.receive(byteBuffers2);

        Charset charset = Charset.forName("UTF-8");

        CharsetDecoder decoder = charset.newDecoder();

        byteBuffers2.flip();

        CharBuffer charBuffer = decoder.decode(byteBuffers2.asReadOnlyBuffer());

        System.out.println(charBuffer.toString());

        System.out.println("done");

    }

}

  • 服务端代码 ( python3 ) :

#coding:utf-8

import socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 绑定 客户端口和地址:

s.bind(('127.0.0.1', 9999))

print('Bind UDP on 9999...')

while True:

    # 接收数据 自动阻塞 等待客户端请求:

    data,addr=s.recvfrom(1024)

    print("Receive from %s:%s"% addr)

    print("Receive %s"% data.decode("utf-8"))

    s.sendto("Hello ".encode("utf-8")+ data,addr)

0x02 异常栈


Found one Java-level deadlock:



"main":

  waiting to lock monitor 0x00007fb81b149b00 (object 0x0000000747b8ffb8, a java.lang.Object),

  which is held by "pool-1-thread-11"

"pool-1-thread-11":

  waiting to lock monitor 0x00007fb81a918200 (object 0x0000000747d208b0, a java.lang.Object),

  which is held by "main"



Java stack information for the threads listed above:



"main":

    at sun.nio.ch.DatagramChannelImpl.implCloseSelectableChannel(java.base@10.0.1/DatagramChannelImpl.java:1005)

    - waiting to lock <0x0000000747b8ffb8> (a java.lang.Object)

    at java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.base@10.0.1/AbstractSelectableChannel.java:234)

    at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base@10.0.1/AbstractInterruptibleChannel.java:162)

    - locked <0x0000000747b8ff68> (a java.lang.Object)

    at java.lang.Thread.interrupt(java.base@10.0.1/Thread.java:1019)

    - locked <0x0000000747d208b0> (a java.lang.Object)

    at java.util.concurrent.FutureTask.cancel(java.base@10.0.1/FutureTask.java:173)

    at main.App.main(App.java:33)

"pool-1-thread-11":

    at java.lang.Thread.blockedOn(java.base@10.0.1/Thread.java:239)

    - waiting to lock <0x0000000747d208b0> (a java.lang.Object)

    at java.lang.System$2.blockedOn(java.base@10.0.1/System.java:2115)

    at java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(java.base@10.0.1/AbstractInterruptibleChannel.java:208)

    at java.nio.channels.spi.AbstractInterruptibleChannel.end(java.base@10.0.1/AbstractInterruptibleChannel.java:195)

    at sun.nio.ch.DatagramChannelImpl.receive(java.base@10.0.1/DatagramChannelImpl.java:379)

    - locked <0x0000000747b8ff98> (a java.lang.Object)

    at sun.nio.ch.DatagramChannelImpl.connect(java.base@10.0.1/DatagramChannelImpl.java:754)

    - locked <0x0000000747b8ff88> (a java.lang.Object)

    - locked <0x0000000747b8ffb8> (a java.lang.Object)

    - locked <0x0000000747b8ffa8> (a java.lang.Object)

    - locked <0x0000000747b8ff98> (a java.lang.Object)

    at main.RunClass.go(App.java:51)

    at main.RunClass.call(App.java:44)

    at main.RunClass.call(App.java:40)

    at java.util.concurrent.FutureTask.run(java.base@10.0.1/FutureTask.java:264)

    at java.util.concurrent.Executors$RunnableAdapter.call(java.base@10.0.1/Executors.java:514)

    at java.util.concurrent.FutureTask.run(java.base@10.0.1/FutureTask.java:264)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@10.0.1/ThreadPoolExecutor.java:1135)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@10.0.1/ThreadPoolExecutor.java:635)

    at java.lang.Thread.run(java.base@10.0.1/Thread.java:844)



Found 1 deadlock.

0x03 死锁发生条件

  • String a = futureTask.get(3, TimeUnit.MILLISECONDS);

超时时间设置的较短

  • futureTask.cancel(true);

cancel传入true

0x04 分析

根据异常栈可知:

  • main线程拿着Thread对象的blockerLock锁,在等待DatagramChannelImpl对象的stateLock锁

  • pool-1-thread-11线程拿着DatagramChannelImpl对象的stateLock锁,在等待Thread对象的blockerLock锁

那么,为什么会出现这种情况呢?我们跟着源码看看代码做了什么。

我们先来看执行UDP通信的线程在做什么:

  1. 首先,DatagramChannel对象执行了connect方法。udp和tcp不一样,udp通信是不需要建立连接的,那么这个connect是在干什么呢?根据源码我们可以看到,这里主要是为了清除已经收到的udp包。这个方法首先就拿到了三个锁,其中就有我们关注的stateLock锁。

为什么会有stateLock锁呢?因为这里有个状态转换,这个DatagramChannel对象的状态要从ST_UNINITIALIZED转换到ST_CONNECTED,这里只有完成了connect方法里的所有步骤,才算完成状态转换,才会释放锁。

  1. 我们继续看connect方法中的receive方法做了些什么。receive方法中,在执行接受数据前执行了一个begin方法,在完成后执行了end方法。我们先来看看begin方法,begin方法调用了一个叫做blockedOn的方法,这里是向当前线程注册了一个被interrupt之后的事件,也就是说当前线程被interrupt的话,会调用注册的这个方法。blockedOn方法里会拿到当前线程的blockerLock锁,这里我们关注的第二个锁就出现了。end方法,其实就是取消注册当前线程被interrupt之后的事件,end方法也会调用blockedOn方法,所以也需要拿到blockerLock锁。

接下来,我们再来看看main线程做了些什么:

  1. main线程在调用FutureTask对象的get方法超时之后,就调用了FutureTask对象的cancel方法。如果cancel方法传入的参数为true,那么在cancel方法里,会调用执行线程的interrupt方法,也就是上面说的执行udp通信的线程的interrupt方法。
  1. main线程调用了执行线程的interrupt方法后,执行线程会拿到自己的Thread对象的blockerLock锁,然后调用通过blockedOn方法注册的事件。
  1. 翻看源码我们可以发现,DatagramChannel对象通过blockedOn方法注册的事件就是自己的implCloseSelectableChannel方法,而调用implCloseSelectableChannel方法时,会拿到stateLock锁。

结合着源码和异常栈,我们可以得出死锁流程是这个样子的:

  1. 首先udp线程拿着stateLock锁准备接受数据。

  2. 然后在真正读数据之前向当前线程注册了一个interrupt事件。

  3. 之后读完数据之后准备获取当前线程的blockerLock锁取消注册的interrupt事件。

  4. 这时,因为timeout时间已到,触发线程的interrupt事件,线程拿着blockerLock锁去调用implCloseSelectableChannel方法,而implCloseSelectableChannel方法需要stateLock锁。

至此,死锁就形成了。

0x05 总结

对于死锁问题,只要跟着异常栈和源码慢慢分析就好。