GVKun编程网logo

S3 Select for Java 使用记录(s3 java sdk)

39

如果您对S3SelectforJava使用记录和s3javasdk感兴趣,那么这篇文章一定是您不可错过的。我们将详细讲解S3SelectforJava使用记录的各种细节,并对s3javasdk进行深入

如果您对S3 Select for Java 使用记录s3 java sdk感兴趣,那么这篇文章一定是您不可错过的。我们将详细讲解S3 Select for Java 使用记录的各种细节,并对s3 java sdk进行深入的分析,此外还有关于java NIO --selector、java NIO selector、Java NIO Selector 的使用、Java NIO Selector可以选择不超过50个SelectionKeys?的实用技巧。

本文目录一览:

S3 Select for Java 使用记录(s3 java sdk)

S3 Select for Java 使用记录(s3 java sdk)

背景

后台基本使用 Amazon 的全家桶(EC2、DynamoDB、S3、Step Fuction 等等)构建。现在需要根据访问者的 IP 确定访问者的国家或地区。

已知:

  1. 访问者 IP

  2. 一个 ipdata.csv 文件,已放置在 S3 的桶 ow-public-us 中,格式如下

    ip_from ip_to country_code country_name
    0 16777215 - -
    16777216 16777471 AU Australia
    16777472 16778239 CN China

流程

1. 引入 S3 Select

compile "com.amazonaws:aws-java-sdk-s3:1.11.379"

2. 构建 AmazonS3 对象

public AmazonS3 createAmazonS3(){
    final AwsSupport awsSupport = new AwsSupport();
    ClientConfiguration clientConfiguration = new ClientConfiguration();
    clientConfiguration.setSocketTimeout((int) TimeUnit.SECONDS.toMillis(70));
    AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard()
                                                            .withCredentials(awsSupport.getCredentials())
                                                            .withClientConfiguration(
                                                                clientConfiguration);
    // ). region
    final Region region = awsSupport.getCurrentRegion();
    if (region != null) {
        builder.withRegion(region.getName());
    }
    return builder.build();
}

3. 构建 SelectObjectContentRequest 对象

本文中输入的为 CSV 无压缩数据,输出为 Json 类型数据。

public static SelectObjectContentRequest createBaseCSVRequest(String bucket,
                                                                String key,
                                                                String query) {
    SelectObjectContentRequest request = new SelectObjectContentRequest();
    request.setBucketName(bucket);
    request.setKey(key);
    request.setExpression(query);
    request.setExpressionType(ExpressionType.SQL);

    InputSerialization inputSerialization = new InputSerialization();
    inputSerialization.setCsv(new CSVInput());
    inputSerialization.setCompressionType(CompressionType.NONE);
    request.setInputSerialization(inputSerialization);

    OutputSerialization outputSerialization = new OutputSerialization();
    outputSerialization.setJson(new JSONOutput());
    request.setOutputSerialization(outputSerialization);
    return request;
}

4. 转化 IP 为 IP LONG

将 IP 字符串 转为 long 型数值,方便进行 IP 国家地区定位。

public static long ip2Long(String ipAddress) {
    if (Strings.isNullOrEmpty(ipAddress)) {
        return 0L;
    }
    long result = 0;
    String[] ipAddressInArray = ipAddress.split("\\.");

    for (int i = 3; i >= 0; i--) {
        long ip = Long.parseLong(ipAddressInArray[3 - i]);
        // left shifting 24,16,8,0 and bitwise OR
        // 1. 192 << 24
        // 1. 168 << 16
        // 1. 1 << 8
        // 1. 2 << 0
        result |= ip << (i * 8);
    }
    return result;
}

5. 请求并获取国家地区信息

// _1 代表第一列 ip_from
// _2 代表第二列 ip_to
// _3 代表第三列 country_code
// 注意: SQL 中的变量需要用单引号括起来
SelectObjectContentResult selectObjectContentResult =
    createAmazonS3().selectObjectContent(createBaseCSVRequest("ow-public-us",
                                                                    "ipdata.csv",
                                                                    "SELECT s.\''country_code\'' FROM S3Object s WHERE s._1<=\''" +
                                                                    ipLong +
                                                                    "\'' AND s._2>=\''" +
                                                                    ipLong + "\'' LIMIT 1"));
selectObjectContentResult.getPayload()
                            .getRecordsInputStream(new SelectObjectContentEventVisitor() {
                            @Override
                            public void visit(SelectObjectContentEvent.RecordsEvent event) {
                                try {
                                String content =
                                    new String(event.getPayload().array(), "utf-8");
                                LOGGER.debug("Country is --> {}", content);
                                JsonObject object = Json.fromJson(content, JsonObject.class);
                                String country = object.get("_3").getAsString();
                                } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                                }
                            }
                            });

预警

在编辑 S3 Select 的 SQL 语句时,使用下列形式是不支持的:

// 出错:AmazonS3Exception: The column index at line 1, column 8 is invalid. Please check the service documentation and try again. (Service: Amazon S3; Status Code: 400; Error Code: InvalidColumnIndex;
String sql = "SELECT s.\"country_code\" FROM S3Object s WHERE s._1<=\''" + ipLong +"\'' AND s._2>=\''" + ipLong + "\'' LIMIT 1";

// 出错:AmazonS3Exception: Invalid Path component, expecting either an IDENTIFIER or STAR, got: LITERAL,at line 1, column 10. (Service: Amazon S3; Status Code: 400; Error Code: ParseInvalidPathComponent;
String sql = "SELECT s.\''country_code\'' FROM S3Object s WHERE s._1<=\''" + ipLong +"\'' AND s._2>=\''" + ipLong + "\'' LIMIT 1";

// 出错:AmazonS3Exception: The column index at line 1, column 8 is invalid. Please check the service documentation and try again. (Service: Amazon S3; Status Code: 400; Error Code: InvalidColumnIndex;
String sql = "SELECT s.country_code FROM S3Object s WHERE s._1<=\''" + ipLong +"\'' AND s._2>=\''" + ipLong + "\'' LIMIT 1";

但是第一种写法在 Python 库 boto3 中是支持的,可以参见 参考2。

参考

  1. 使用 适用于 Java 的开发工具包 从对象中选择内容 - Amazon
  2. S3 Select — new revolution “at rest” - Medium

java NIO --selector

java NIO --selector

传统的IO 编程中,针对每一个客户端连接都会创建一个新的线程;

而 NIO 一个线程可以处理很多客户端的请求

我们分析源码的注解,可以归纳出:

  1.    Selector 构造方式:(常见的Seletor 构造方式) 

   

 

源码:

* <p> A selector may be created by invoking the {@link #open open} method of
* this class, which will use the system''s default {@link
* java.nio.channels.spi.SelectorProvider selector provider} to
* create a new selector. A selector may also be created by invoking the
* {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}
* method of a custom selector provider. A selector remains open until it is
* closed via its {@link #close close} method.

 翻译:

  Selector 是通过调用这个类的open()方法进行创建的,它使用的是系统默认的 选择器提供者去创建一个新的Selector;

  也可以调用spi 里的 openSelector 方法进行自定义Selector 的创建;

  在调用close 方法之前,Selector 是保持打开的;

结果:

Selector selector = Selector.open();  //Selector 构造方式

2. 关于通道的注册以及selector 管理通道发生的事件(selector 是通过事件进行监听的)

源码:
* <p> A key is added to a selector''s key set as a side effect of registering a * channel via the channel''s {@link SelectableChannel#register(Selector,int) * register} method. Cancelled keys are removed from the key set during * selection operations. The key set itself is not directly modifiable.

翻译解析:

  我们可以通提供过继承了
SelectableChannel 的类的 register 方法进行通道的注册,返回SelectionKey 在注册的同时,一个key 也会加入到选择器的键集里;
  需要注意的是 只有继承了 SelectableChannel 的类 才能够进行注册;

  
在注册方法中有两个参数  SelectionKey register(Selector sel, int ops) ;Selector 就是我们的选择器, ops 是我们需要监听的事件,

 
以socketChannel 为例:
  

//创建Selector 选择器对象
Selector selector = Selector.open();

//创建ServerSocketChannel

ServerSocketChannel socketChannel = ServerSocketChannel.open();
//设置非阻塞模式
socketChannel.configureBlocking(false);
//创建ServerSocket 绑定端口
ServerSocket socket=socketChannel.socket();
socket.bind(new InetSocketAddress(9999));

//进行通道的注册

socketChannel.register(selector, SelectionKey.OP_ACCEPT);

 

3.. SelectionKey : SelectionKey 是 java NIO 中重要的值(我们可以认为是通道注册的唯一值):

源码:
* <p> A selectable channel''s registration with a selector is represented by a * {@link SelectionKey} object. A selector maintains three sets of selection * keys:

翻译:
selector 选择器中可选择的通道注册是由
SelectionKey 这个对象表示的,也就是说,在Selector 注册的不同通道,都会有一个 SelectionKey 对应;,这个
SelectionKey 是可以追溯的,也可以通过 SelectionKey 来获取当前 SelectionKey 的通道channel;

一个选择器里维护了三种选择集 分别是如下:


源码:

* <li><p> The <i>key set</i> contains the keys representing the current
* channel registrations of this selector. This set is returned by the
* {@link #keys() keys} method. </p></li>
*
* <li><p> The <i>selected-key set</i> is the set of keys such that each
* key''s channel was detected to be ready for at least one of the operations
* identified in the key''s interest set during a prior selection operation.
* This set is returned by the {@link #selectedKeys() selectedKeys} method.
* The selected-key set is always a subset of the key set. </p></li>
*


* <li><p> The <i>cancelled-key</i> set is the set of keys that have been
* cancelled but whose channels have not yet been deregistered. This set is
* not directly accessible. The cancelled-key set is always a subset of the
* key set. </p></li>

翻译理解:

1. 键集:包含了选择器里当前通道注册的键,可以通过keys 方法进行返回这个键集;它是最全所有的键集

2.selected 键集 : 一句话就是当前selector 监听的事件(感兴趣的事件)触发返回的一个键集合, 可以通过selectKeys()方法进行获取;它是键集(1)的子类

3. cancelled 键集:是一个被取消的键集,但是通道还没有被撤销登记,这几集合不能直接被操作,也是键集(1)的子类

 

java NIO selector

java NIO selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

为什么使用Selector?

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。

下面是单线程使用一个Selector处理3个channel的示例图:

Selector的创建

通过调用Selector.open()方法创建一个Selector,如下:

Selector selector = Selector.open();

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

向Selector注册通道

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);

与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。

注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  1. Connect
  2. Accept
  3. Read
  4. Write

通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

这四种事件用SelectionKey的四个常量来表示:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

 如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

在下面还会继续提到interest集合。

SelectionKey

在上一小节中,当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些你感兴趣的属性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象(可选)

下面我会描述这些属性。

interest集合

就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,像这样:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

可以看到,用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。

ready集合

ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。Selection将在下一小节进行解释。可以这样访问ready集合:

int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

从SelectionKey访问Channel和Selector很简单。如下:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

附加的对象

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

通过Selector选择通道

一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

下面是select()方法:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()阻塞到至少有一个通道在你注册的事件上就绪了。

select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。

selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

selectedKeys()

一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示:

Set selectedKeys = selector.selectedKeys();

当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

可以遍历这个已选择的键集合来访问就绪的通道。如下:

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每个键,并检测各个键所对应的通道的就绪事件。

注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。

wakeUp()

某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

完整的示例

这里有一个完整的示例,打开一个Selector,注册一个通道注册到这个Selector上(通道的初始化过程略去),然后持续监控这个Selector的四种事件(接受,连接,读,写)是否就绪。

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}

Java NIO Selector 的使用

Java NIO Selector 的使用

之前的文章已经把 Java 中 NIO 的 Buffer、Channel 讲解完了,不太了解的可以先回过头去看看。这篇文章我们就来聊聊 Selector —— 选择器。

首先 Selector 是用来干嘛的呢?不熟悉这个概念的话我们其实可以这么理解:

selector

把它当作 SQL 中的 select 语句,在 SQL 中无非就是筛选出符合条件的结果集合。而 NIO 中的 Selector 用途类似,只不过它选择出来的是有就绪 IO 事件的 Channel

IO 事件代表了 Channel 对于不同的 IO 操作所处的不同的状态,而不是对 Channel 进行 IO 操作。总共有 4 种 IO 事件的定义:

  • OP_READ 可读
  • OP_WRITE 可写
  • OP_CONNECT 连接
  • OP_ACCEPT 接收

IO 事件分类

比如 OP_READ,其就绪是指数据已经在内核态 Ready 了并且已经从内核态复制到了用户态的缓冲区,然后我们的应用程序就可以去读取数据了,这叫可读

再比如 OP_CONNECT,当某个 Channel 已经完成了握手连接,则 Channel 就会处于 OP_CONNECT 的状态。

对用户态和内核态不了解的,可以去看看之前写的 《用户态和内核态的区别》

在之前讲 BIO 模型的时候说过,用户态在发起 read 系统调用之后会一直阻塞,直到数据在内核态 Ready 并且复制到用户态的缓冲区内。如果只有一个用户还好,随便你阻塞多久。但要是这时有其他用户发请求进来了,就会一直卡在这里等待。这样串行的处理会导致系统的效率极其低下。

针对这个问题,也是有解决方案的。那就是为每个用户都分配一个线程(即 Connection Per Thread),乍一想这个思路可能没问题,但使用线程需要消耗系统的资源,例如在 JVM 中一个线程会占用较多的资源,非常昂贵。系统稍微并发多一些(例如上千),你的系统就会直接 OOM 了。而且,线程频繁的创建、销毁、切换也是一个比较耗时的操作。

而如果用 NIO,虽然不会阻塞了,但是会一直轮询,让 CPU 空转,也是一个不环保的方式。

而如果用 Selector,只需要一个线程来监听多个 Channel,而这个多个可以上千、上万甚至更多。那这些 Channel 是怎么跟 Selector 关联上的呢?

答案是通过注册,因为现在变成了 Selector 决定什么时候处理 Channel 中的事件,而注册操作则相当于将 Channel 的控制权转交给了 Selector。一旦注册上了,后续当 Channel 有就绪的 IO 事件,Selector 就会将它们选择出来执行对应的操作。

说了这么多,来看个例子吧,客户端的代码相对简单,后续再看,我们先看服务端的:

public static void main(String[] args) throws IOException {
  // 创建 selector, 管理多个 channel
  Selector selector = Selector.open();

  // 创建 ServerSocketChannel 并且绑定端口
  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  serverSocketChannel.configureBlocking(false);
  serverSocketChannel.bind(new InetSocketAddress(8080));

  // 将 channel 注册到 selector 上
  SelectionKey serverSocketChannelKey = serverSocketChannel.register(selector, 0);
  // 由于总共有 4 种事件, 分别是 accept、connect、read 和 write,
  // 分别代表有连接请求时触发、客户端建立连接时触发、可读事件、可写事件
  // 我们可以使用 interestOps 来表明只处理有连接请求的事件
  serverSocketChannelKey.interestOps(SelectionKey.OP_ACCEPT);

  System.out.printf("serverSocketChannel %s\n", serverSocketChannelKey);
  while (true) {
    // 没有事件发生, 线程会阻塞; 有事件发生, 就会让线程继续执行
    System.out.println("start to select...");
    selector.select();
    // 换句话说, 有连接过来了, 就会继续往下走

    // 通过 selectedKeys 包含了所有发生的事件, 可能会包含 READ 或者 WRITE
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()) {
      SelectionKey key = iterator.next();
      System.out.printf("selected key %s\n", key);

      // 这里需要进行事件区分
      if (key.isAcceptable()) {
        System.out.println("get acceptable event");

        // 触发此次事件的 channel, 拿到事件一定要处理, 否则会进入非阻塞模式, 空转占用 CPU
        // 例如你可以使用 key.cancel()
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = channel.accept();
        socketChannel.configureBlocking(false);

        // 这个 socketChannel 也需要注册到 selector 上, 相当于把控制权交给 selector
        SelectionKey socketChannelKey = socketChannel.register(selector, 0);
        socketChannelKey.interestOps(SelectionKey.OP_READ);
        System.out.printf("get socketChannel %s\n", socketChannel);
      } else if (key.isReadable()) {
        System.out.println("get readable event");

        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buf = ByteBuffer.allocate(16);
        channel.read(buf);
        buf.flip();
        ByteBufferUtil.debugRead(buf);
        key.cancel();
      }
      
      iterator.remove();
    }
  }
}

看起来有点多,但相应的注释都写了,可以先看看。其实这里的很多代码跟之前的玩转 Channel 的代码差不多的,这里抽一些我认为值得讲的解释一下。

首先就是 Selector.open(),跟 Channel 的 open 方法类似,可以理解为创建一个 selector。

其次就是 SelectionKey serverSocketChannelKey = serverSocketChannel.register(selector, 0); 了,我们调用了 serverSocketChannel 的注册方法之后,返回了一个 SelectionKey,这是个什么概念呢?

说简单点,你可以把 SelectionKey 理解为你去商场寄存柜存东西,那个机器吐给你的提取凭证

换句话说,这个 SelectionKey 就是当前这个 serverSocketChannel 注册到 selector 上的凭证。selector 会维护一个 SelectionKey 的集合,用于统一管理。

selectionkey 集合

上图中的每个 Key 都代表了一个具体的 Channel。

而至于 register 的第二个参数,我们传入的是 0,代表了当前 Selector 需要关注这个 Channel 的哪些 IO 事件。0 代表不关注任何事件,我们这里是通过 serverSocketChannelKey.interestOps(SelectionKey.OP_ACCEPT); 来告诉 Selector,对这个 Channel 只关注 OP_ACCEPT 事件。

IO 事件有 4 个,如果你想要同时监听多个 IO 事件怎么办呢?答案是通过或运算符。

serverSocketChannelKey.interestOps(SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);

上面说过,NIO 虽然不阻塞,但会一直轮询占用 CPU 的资源,而 Selector 解决了这个问题。在调用完 selector.select(); 之后,线程会在这里阻塞,而不会像 NIO 一样疯狂轮询,把 CPU 拉满。所以 Selector 只会在有事件处理的时候才执行,其余时间都会阻塞,极大的减少了 CPU 资源的占用。

当客户端调用 connect 发起连接之后,Channel 就会处于 OP_CONNECT 就绪状态,selector.select(); 就不会再阻塞,会继续往下运行,即:

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

其中 selectedKeys 这个名字也能看出来,表示被选出来的 SelectionKey。上面我们已经讨论过 Selector 维护的一种集合 —— SelectionKey 集合,接下来我们再讨论另外一种集合 —— SelectedKey 集合。

selectedkey 集合

当 Channel 有就绪 IO 事件之后,对应的 Key 就会被加入到 SelectedKey 集合中,然后这一次 While 循环会依次处理被选择出来的所有 Key。

但被选择出来的 Key 可能触发的是不同的 IO 事件,所以我们需要对 Key 进行区分。代码里区分了 OP_ACCEPT 和 OP_READ,分别讨论一下。

ServerSocketChannel 一开始 register 的时候只设定关注 OP_ACCEPT 事件,所以第一次循环只会进入 IsAcceptable 分支里,所以这里通过 iterator.next() 迭代器拿到的 SelectionKey 就是 serverSocketChannel 注册之后返回的 Key,同理拿到的 channel 的就是最开始调用 ServerSocketChannel.open(); 创建的 channel。

拿到了 ServerSocketChannel 我们就可以调用其 accept() 方法来处理建立连接的请求了,这里值得注意的是,建立连接之后,这个 SocketChannel 也需要注册到 Selector 上去,因为这些 SocketChannel 也需要将控制权交给 Selector,这样后续有就绪 IO 事件才能通过 Selector 处理。这里我们对这个 SocketChannel 只关注 OP_READ 事件。相当于把后续进来的所有的连接和 Selector 就关联上了。

Accept 事件处理成功之后,服务器这边会继续循环,然后再次在 selector.select(); 处阻塞住。

客户端这边会继续调用 write 方法向 channel 写入数据,数据 Ready 之后就会触发 OP_READ 事件,然后继续往下走,这次由于事件是 OP_READ 所以会进入 key.isReadable() 这个分支。进入这个分支之后会获取到对应的 SocketChannel,并从其中读取客户端发来的数据。

而另一个值得关注的是 iterator.remove();,每次迭代都需要把当前处理的 SelectedKey 移除,这是为什么呢?

因为对应的 Key 进入了 SelectedKey 集合之后,不会被 NIO 里的机制给移除。如果我们不去移除,那么下一次调用 selector.selectedKeys().iterator(); 会发现,上次处理的有 OP_ACCEPT 事件的 SelectionKey 还在,而这会导致上面的服务端程序抛出空指针异常。

大家可以自行将 iterator.remove(); 注释掉再试试

客户端的代码很简单,就直接给出来了:

public static void main(String[] args) throws IOException {
  SocketChannel socketChannel = SocketChannel.open();
  socketChannel.connect(new InetSocketAddress("localhost", 8080));

  ByteBuffer buffer = ByteBuffer.allocate(16);
  buffer.put("test".getBytes(StandardCharsets.UTF_8));

  buffer.flip();
  socketChannel.write(buffer);
}

如果不去移除的话,服务端会在下面这行 NPE。

socketChannel.configureBlocking(false);

为啥呢?因为此时 SelectionKey 虽然还在,ServerSocketChannel 也能拿到,但调用 channel.accept(); 的时候,并没有客户端真正在发起连接(上一个循环已经处理过真正的连接请求了,只是没有将这个 Key 从 SelectedKey 中移除)。所以 channel.accept(); 会返回一个 null,我们再对 null 调用 configureBlocking 方法,自然而然就 NPE 了。

Java NIO Selector可以选择不超过50个SelectionKeys?

Java NIO Selector可以选择不超过50个SelectionKeys?

我使用 siege对我的手工构建的文件服务器进行压力测试,它适用于小文件(小于1KB),而在使用1MB文件进行测试时,它不能按预期工作.

以下是使用小文件进行测试的结果:

neevek@~$siege -c 1000 -r 10 -b http://127.0.0.1:9090/1KB.txt
** SIEGE 2.71
** Preparing 1000 concurrent users for battle.
The server is Now under siege..      done.

Transactions:              10000 hits
Availability:             100.00 %
Elapsed time:               9.17 secs
Data transferred:           3.93 MB
Response time:              0.01 secs
Transaction rate:        1090.51 trans/sec
Throughput:             0.43 MB/sec
Concurrency:                7.29
Successful transactions:       10000
Failed transactions:               0
Longest transaction:            1.17
Shortest transaction:           0.00

以下是使用1MB文件进行测试的结果:

neevek@~$siege -c 1000 -r 10 -b http://127.0.0.1:9090/1MB.txt
** SIEGE 2.71
** Preparing 1000 concurrent users for battle.
The server is Now under siege...[error] socket: read error Connection reset by peer sock.c:460: Connection reset by peer
[error] socket: unable to connect sock.c:222: Connection reset by peer
[error] socket: unable to connect sock.c:222: Connection reset by peer
[error] socket: unable to connect sock.c:222: Connection reset by peer
[error] socket: read error Connection reset by peer sock.c:460: Connection reset by peer
[error] socket: unable to connect sock.c:222: Connection reset by peer
[error] socket: read error Connection reset by peer sock.c:460: Connection reset by peer
[error] socket: read error Connection reset by peer sock.c:460: Connection reset by peer
[error] socket: read error Connection reset by peer sock.c:460: Connection reset by peer
[error] socket: read error Connection reset by peer sock.c:460: Connection reset by peer

当siege以上述错误终止时,我的文件服务器仍然使用固定数量的WRITABLE SelectionKey旋转,即Selector.select()保持返回固定数字,比如50.

通过上面的测试,在我看来我的文件服务器不能接受不超过50个并发连接,因为当用小文件运行测试时,我注意到服务器选择1或2个SelectionKeys,当运行大文件时,它选择每次最多50个.

我试图在没有帮助的情况下增加Socket.bind()中的积压.

可能是问题的原因是什么?

编辑

更多信息:

当使用1MB文件进行测试时,我注意到siege以broken管道错误终止,并且文件服务器只接受了198个连接,尽管我指定了1000个并发连接x 10轮(1000 * 10 = 10000)来充斥服务器.

编辑2

我已经使用以下代码测试(单个类)来重现同样的问题,在这段代码中,我只接受连接,我不读或写,siege客户端终止连接重置或连接超时之前的管道错误.我还注意到Selector只能选择少于1000个键.您可以尝试下面的代码来见证问题.

public class TestNIO implements Runnable {
    ServerSocketChannel mServerSocketChannel;
    Selector mSelector;

    public static void main(String[] args) throws Exception {
        new TestNIO().start();
    }

    public TestNIO () throws Exception {
       mSelector = Selector.open();
    }

    public void start () throws Exception {
        mServerSocketChannel = ServerSocketChannel.open();
        mServerSocketChannel.configureBlocking(false);
        mServerSocketChannel.socket().bind(new InetSocketAddress(9090));
        mServerSocketChannel.socket().setSoTimeout(150000);
        mServerSocketChannel.register(mSelector,SelectionKey.OP_ACCEPT);

        int port = mServerSocketChannel.socket().getLocalPort();
        String serverName = "http://" + InetAddress.getLocalHost().getHostName() + ":" + port;
        System.out.println("Server start listening on " + serverName);

        new Thread(this).start();

    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
            while (true) {
                int num = mSelector.select();

                System.out.println("SELECT = " + num + "/" + mSelector.keys().size());
                if (num > 0) {
                    Iterator<SelectionKey> keys = mSelector.selectedKeys().iterator();

                    while (keys.hasNext()) {
                        final SelectionKey key = keys.next();

                        if (key.isValid() && key.isAcceptable()) {
                            accept(key);
                        }

                    }
                    // clear the selected keys
                    mSelector.selectedKeys().clear();
                }
            }
        } catch (Exception e) {
            e.printstacktrace();
        }
    }

    private void accept (SelectionKey key) throws IOException {
        SocketChannel socketChannel = mServerSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.socket().setSoTimeout(1000000);
        socketChannel.socket().setKeepAlive(true);
        // since we are connected,we are ready to READ
        socketChannel.register(mSelector,SelectionKey.OP_READ);
    }
}

解决方法

它实际上与为ServerSocketChannel设置的默认积压值相关

http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/sun/nio/ch/ServerSocketChannelImpl.java#138

您可以通过将backlog值作为第二个参数传递给bind方法来解决此问题.

mServerSocketChannel.socket().bind(new InetSocketAddress(9090),“backlog value”)

关于S3 Select for Java 使用记录s3 java sdk的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于java NIO --selector、java NIO selector、Java NIO Selector 的使用、Java NIO Selector可以选择不超过50个SelectionKeys?等相关知识的信息别忘了在本站进行查找喔。

本文标签: