如果您想了解Netty与ApacheMINA的知识,那么本篇文章将是您的不二之选。同时我们将深入剖析ApacheMINA、ApacheMINA---[使用JMX来管理MINA应用]、ApacheMin
如果您想了解Netty与Apache MINA的知识,那么本篇文章将是您的不二之选。同时我们将深入剖析Apache MINA、Apache MINA --- [使用JMX来管理MINA应用]、Apache Mina -2、apache mina 学习笔记之一:mina简介和环境搭建的各个方面,并给出实际的案例分析,希望能帮助到您!
本文目录一览:- Netty与Apache MINA
- Apache MINA
- Apache MINA --- [使用JMX来管理MINA应用]
- Apache Mina -2
- apache mina 学习笔记之一:mina简介和环境搭建
Netty与Apache MINA
它们都提供大致相同的功能。我应该选择哪一个来开发我的高性能TCP服务器?优点和缺点是什么?
参考链接:
Apache MINA(源代码)
净资产(来源)
答案1
小编典典虽然MINA和Netty具有相同的抱负,但它们在实践中却大相径庭,因此您应该仔细考虑自己的选择。我们很幸运,因为我们在MINA方面拥有丰富的经验,并且有时间陪伴Netty。我们特别喜欢更简洁的API和更好的文档。在纸上的表现似乎也更好。更重要的是,我们知道Trustin
Lee将随时回答我们遇到的任何问题,他当然做到了。
我们发现Netty的一切变得更容易。期。在尝试重新实现与MINA相同的功能时,我们是从头开始的。通过遵循出色的文档和示例,我们最终以更少得多的代码获得了更多的功能。
Netty管道对我们来说更好。它比MINA更为简单,在MINA中,一切都是处理程序,由您决定是要处理上游事件,下游事件,还是同时处理更多低级事件。在“重放”解码器中吞噬字节几乎是一种乐趣。能够如此轻松地即时重新配置管道也非常好。
但是Netty的最吸引人的地方是恕我直言,它具有创建“覆盖一个”的管道处理程序的能力。您可能已经在文档中阅读了有关coverage注释的信息,但是从本质上讲,它使您可以在一行代码中给出状态。由于没有混乱,没有会话映射,同步和类似的东西,我们仅能够声明常规变量(例如“用户名”)并使用它们。
但是后来我们遇到了障碍。我们已经在MINA下拥有一个多协议服务器,其中我们的应用程序协议基于TCP /
IP,HTTP和UDP运行。当我们切换到Netty时,我们在几分钟内将SSL和HTTPS添加到了列表中!到目前为止,一切都还不错,但是当涉及到UDP时,我们意识到我们已经滑倒了。MINA对我们非常友好,因为我们可以将UDP视为“连接”协议。在Netty下,没有这样的抽象。UDP是无连接的,Netty将其视为无连接。与MINA相比,Netty在更低的级别上暴露了UDP的更多无连接性。在Netty下使用UDP可以做的事情比在MINA提供的更高层次的抽象下不能做的,但是我们依靠它。
添加“连接的UDP”包装器或其他东西并不是那么简单。鉴于时间限制,并且在Trustin的建议下,最好的方法是在Netty中实现我们自己的运输提供商,但这并不是很快,所以我们最终不得不放弃Netty。
因此,仔细研究它们之间的差异,并迅速进入一个阶段,您可以测试任何棘手的功能是否按预期工作。如果您对Netty会做的工作感到满意,那么我会毫不犹豫地在MINA上使用它。如果您要从MINA迁移到Netty,则同样适用,但是值得注意的是,这两个API确实有很大的不同,您应该考虑对Netty进行虚拟重写-
您不会后悔的!
Apache MINA
apache mina是高可用的网络通信框架,基于NIO非阻塞实现。 在MINA之上实现的包括FtpServer,
异步web框架asyncweb,纯java的SSH的实现SSHD,Vysper(即时通讯)等。
其主要特征包括:
1.多种传输类型的API
如TCP/UDP,串口通讯RXTX,In-VM Pipe
2.使用过滤器作为接口,方便扩展
使用方式like>> acceptor.addFilter("codec",new ProtocolFactory(xxxx));
3.同时提供底层和高层的API
底层使用ByteBuffer,高层可使用用户自定义的消息或者对象
4.高客户化的线程模型
5.过载保护和流量节流
6.JMX管理
使用JMX可以方便对应用进行管理,tomcat也是如此
7.使用StreamHandler支持流IO
8.与PicoContainer,Spring很好的集成
9.从netty可以平滑的迁移
主要用到的类:包括IoAcceptor,Filter,Handler,Session
讲到MINA,首先得了解NIO,NIO即同步非阻塞IO,这里有两个关键字,同步和非阻塞。
同步是指有一个线程去轮询IO,查看IO是否就绪,而非阻塞就是其他调用IO读写的线程不必在IO非就绪时,可以做其他事。 由此变可以猜想, 应该就是有一个单独的线程在不断的轮询IO,当此线程检测到某个IO就绪时,然后去告知相应的要真正进行IO读写的线程,相关线程收到通知,进行IO读写。
而实际上,查看NIO源码实现可知,去轮询IO状态的就是一个Selector,调用 selector.select()方法会去轮询IO,当没有数据读写时,该方法会阻塞。 NIO的同步就在于此,关于非阻塞,就涉及到缓冲区buffer和通道channel。
通常来讲,一个连接建立,并不是马上就会有数据到达,在BIO中,线程必须全天候的阻塞等待,以保障能正确的完成数据的IO。 为了解决这个问题,引入了缓冲区,数据到达,先将数据存入缓冲区,再将缓存区交给线程,最后完成IO。 而通道channel,就是IO发生时的入口。 其java调用就是 channel.write(buffer)
epoll和NIO:
epoll和NIO都是使用缓冲区和channel, NIO是使用线程可以是单线程selector处理多个channel,channel首先向selector注册,selector注册的事件有ON_CONNECT,ON_ACCEPT,ON_READ,ON_WRITE。selector会轮询IO, 当有相应的事件发生时,selector去通知相应的channel。 epoll 是当IO就绪时,会自动触发消息通知。 可以认为epoll是更高级的NIO。
传统的NIO阻塞在Selector上,但还可以对其进行优化,优化方式是对selector分解为Reactor,将不同的事件分开,每个Reactor只负责一种事件,设置不同的阻塞级别:ON_CONNECT连接阻塞,ON_ACCEPT接受阻塞,,ON_READ读阻塞,ON_WRITE写阻塞。 这样可以减少轮询时间,而且线程不需遍历所有的时事件,只需遍历自己感兴趣的即可。 如图1所示。
AIO:异步IO,按理说比NIO更高级,为啥不用呢? 很简单,Linux不支持AIO。
selector.open() 的时候,会创建一个自己和自己的链接?为什么?
因为selector.select()时会阻塞,等待IO就绪,如果此时有新的channel加入,则selector.select()需要被唤醒,然后重新select()最新的channel集合。 而要唤醒线程,需要调用selector.wakeup()方法。 而唤醒原理如下:
被select()阻塞的线程有三种唤醒方式:
1.有数据可读写,或者有异常
2.阻塞时间超时timeout
3.收到non-block信号
除去第 二钟方式,第三种non-block适用于Linux系统,如kill 方式。 所以,selector.open()时打开一个自身的连接,就是用来唤醒线程的。
ARP transport?
PicoContainer?
Apache MINA --- [使用JMX来管理MINA应用]
要让MINA应用可以被JMX管理,我们需要执行以下几个步骤:
1.创建MBean服务.
// create a JMX MBean Server server instance
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
2.实例化所有的MBean(IoAcceptor,IoFilter).
// create a JMX-aware bean that wraps a MINA IoService object.In this case, a NioSocketAcceptor.
IoServiceMBean acceptorMBean = new IoServiceMBean(acceptor);
3.注册MBean.
// create a JMX ObjectName. This has to be in a specific format.
ObjectName acceptorName = new ObjectName( acceptor.getClass().getPackage().getName() +
":type=acceptor,name=" + acceptor.getClass().getSimpleName());
// register the bean on the MBeanServer. Without this line, no JMX will happen for
// this acceptor.
mBeanServer.registerMBean( acceptorMBean, acceptorName );
开启服务:
java -classpath <CLASSPATH> }}{{{}org.apache.mina.example.imagine.step3.server.ImageServer
开启JConsole:
JAVA_HOME/bin/jconsole.exe
Apache Mina -2
我们可以了解到 mina 是个异步通信框架,一般使用场景是服务端开发,长连接、异步通信使用 mina 是及其方便的。不多说,看例子。
本次 mina 使用的例子是使用 maven 构建的,过程中需要用到的 jar 包如下:
<!-- mina -->
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-integration-beans</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.16</version>
</dependency>
导入 jar 包,pom 文件会报错,具体错误如下:
Missing artifact org.apache.mina:mina-core:bundle:2.0.16 pom.xml
1
原因是因为缺少 maven-bundle-plugin 导入即可
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
</plugins>
</build>
本次 mina 机制的消息通讯接口规范主要为:
包头 2 个字节,包长 2 个字节,协议类型 2 个字节,数据包标识码 8 个字节,报文正文内容,校验码 4 个字节,包尾 2 个字节
mina 与 spring 结合后,使用更加方便。
spring 配置文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-lazy-init="false">
<bean>
<property name="customEditors">
<map>
<entry key="java.net.SocketAddress"
value="org.apache.mina.integration.beans.InetSocketAddressEditor"></entry>
</map>
</property>
</bean>
<bean id="ioAcceptor"
init-method="bind" destroy-method="unbind">
<!-- 端口号 -->
<property name="defaultLocalAddress" value=":8888"></property>
<!-- 绑定自己实现的 handler -->
<property name="handler" ref="serverHandler"></property>
<!-- 声明过滤器的集合 -->
<property name="filterChainBuilder" ref="filterChainBuilder"></property>
<property name="reuseAddress" value="true" />
</bean>
<bean id="filterChainBuilder"
>
<property name="filters">
<map>
<!--mina 自带的线程池 filter -->
<entry key="executor" value-ref="executorFilter"></entry>
<entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" />
<!-- 自己实现的编解码器 filter -->
<entry key="codecFilter" value-ref="codecFilter" />
<!-- 日志的 filter -->
<entry key="loggingFilter" value-ref="loggingFilter" />
<!-- 心跳 filter -->
<entry key="keepAliveFilter" value-ref="keepAliveFilter" />
</map>
</property>
</bean>
<!-- executorFilter 多线程处理 -->
<bean id="executorFilter"/>
<bean id="mdcInjectionFilter">
<constructor-arg value="remoteAddress" />
</bean>
<!-- 日志 -->
<bean id="loggingFilter"/>
<!-- 编解码 -->
<bean id="codecFilter">
<constructor-arg>
<!-- 构造函数的参数传入自己实现的对象 -->
<bean></bean>
</constructor-arg>
</bean>
<!-- 心跳检测 filter -->
<bean id="keepAliveFilter">
<!-- 构造函数的第一个参数传入自己实现的工厂 -->
<constructor-arg>
<bean></bean>
</constructor-arg>
<!-- 第二个参数需要的是 IdleStatus 对象,value 值设置为读写空闲 -->
<constructor-arg type="org.apache.mina.core.session.IdleStatus"
value="BOTH_IDLE">
</constructor-arg>
<!-- 心跳频率,不设置则默认 5 -->
<property name="requestInterval" value="1500" />
<!-- 心跳超时时间,不设置则默认 30s -->
<property name="requestTimeout" value="30" />
<!-- 默认 false,比如在心跳频率为 5s 时,实际上每 5s 会触发一次 KeepAliveFilter 中的 session_idle 事件,
该事件中开始发送心跳包。当此参数设置为 false 时,对于 session_idle 事件不再传递给其他 filter,如果设置为 true,
则会传递给其他 filter,例如 handler 中的 session_idle 事件,此时也会被触发 -->
<property name="forwardEvent" value="true" />
</bean>
<!-- 自己实现的 handler-->
<bean id="serverHandler"/>
</beans>
mina 核心包括 IOhandler 处理器,编码工厂(包含编码器,解码器)等核心。
服务端代码如下:
编码工厂 (NSMinaCodeFactory)如下所示:
public class NSMinaCodeFactory implements ProtocolCodecFactory {
private final NSProtocalEncoder encoder;
private final NSProtocalDecoder decoder;
public NSMinaCodeFactory() {
this(Charset.forName("utf-8"));
}
public NSMinaCodeFactory(Charset charset) {
encoder = new NSProtocalEncoder();
decoder = new NSProtocalDecoder();
}
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return decoder;
}
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return encoder;
}
}
编码器 —— 负责将需要发送给客户端的数据进行编码,然后发送给客户端
public class NSProtocalEncoder extends ProtocolEncoderAdapter {
private static final Logger logger = Logger.getLogger(NSProtocalEncoder.class);
@SuppressWarnings("unused")
private final Charset charset = Charset.forName("GBK");
/**
* 在此处实现包的编码工作,并把它写入输出流中
*/
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
// TODO Auto-generated method stub
if(message instanceof BaseMessageForClient){
BaseMessageForClient clientmessage = (BaseMessageForClient)message;
byte [] packhead_arr = clientmessage.getPackHead ().getBytes (charset);// 包头 2 个字节
byte [] length_arr = ByteTools.intToByteArray (clientmessage.getLength ()+19, 2);// 包长
byte [] funcid_arr = ByteTools.intToByteArray (clientmessage.getFuncid (), 1);// 协议类型
byte [] packetIdCode_arr = ByteTools.longToByteArray (clientmessage.getPacketIdCode (), 8);// 数据包标识码
byte [] content_arr = clientmessage.getContent ().getBytes (charset);// 内容
byte [] checkcode_arr = ByteTools.longToByteArray (clientmessage.getCheckCode (), 4);// 校验码
byte [] packtail_arr = clientmessage.getPackTail ().getBytes ();// 包尾
IoBuffer buffer = IoBuffer.allocate(packhead_arr.length + length_arr.length + funcid_arr.length + packetIdCode_arr.length+ content_arr.length + checkcode_arr.length + packtail_arr.length);
buffer.setAutoExpand(true);
buffer.put(packhead_arr);
buffer.put(length_arr);
buffer.put(funcid_arr);
buffer.put(packetIdCode_arr);
buffer.put(content_arr);
buffer.put(checkcode_arr);
buffer.put(packtail_arr);
buffer.flip();
out.write(buffer);
out.flush();
buffer.free();
}else{
String value = (String)message;
logger.warn("encode message:" + message);
IoBuffer buffer = IoBuffer.allocate(value.getBytes().length);
buffer.setAutoExpand(true);
if(value != null){
buffer.put(value.trim().getBytes());
}
buffer.flip();
out.write(buffer);
out.flush();
buffer.free();
}
}
}
解码器 —— 负责将客户端发送过来的数据进行解码变换为对象,传输给 IoHandler 处理器进行处理。本解码器包含了断包问题解决。
public class NSProtocalDecoder implements ProtocolDecoder {
private static final Logger logger = Logger.getLogger(NSProtocalDecoder.class);
private final AttributeKey context = new AttributeKey(getClass(), "context");
private final Charset charset = Charset.forName("GBK");
private final String PACK_HEAD = "$$"; // 包头
private final String PACK_TAIL = "\r\n"; // 包尾
// 请求报文的最大长度 100k
private int maxPackLength = 102400;
public int getMaxPackLength() {
return maxPackLength;
}
public void setMaxPackLength(int maxPackLength) {
if (maxPackLength <= 0) {
throw new IllegalArgumentException ("请求报文最大长度:" + maxPackLength);
}
this.maxPackLength = maxPackLength;
}
private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(context);
if (ctx == null) {
ctx = new Context();
session.setAttribute(context, ctx);
}
return ctx;
}
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
Long start = System.currentTimeMillis();
// 报文前缀长度 包头 2 个字节,包长 2 个字节,协议类型 2 个字节,数据包标识码 8 个字节,校验码 4 个字节,包尾 2 个字节
final int packHeadLength = 19;
// 先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前 buffer 中的数据追加到 Context 的 buffer 当中
ctx.append(in);
// 把 position 指向 0 位置,把 limit 指向原来的 position 位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然后按数据包的协议进行读取
while (buf.remaining() >= packHeadLength) {
logger.debug ("test 长度 1:" + buf.remaining ());
buf.mark();
// 读取包头 2 个字节
String packhead = new String(new byte[]{buf.get(),buf.get()});
logger.debug ("包头:" + packhead);
if(PACK_HEAD.equals(packhead)){
// 读取包的长度 2 个字节 报文的长度,不包含包头和包尾
byte[] length_byte = new byte[]{buf.get(),buf.get()};
byte[] length_byte_arr = new byte[]{0,0,0,0};
length_byte_arr[2] = length_byte[0];
length_byte_arr[3] = length_byte[1];
int length = ByteTools.byteArrayToInt(length_byte_arr);
logger.debug ("长度:" + length);
logger.debug ("test 长度 1:" + buf.remaining ());
// 检查读取是否正常,不正常的话清空 buffer
if (length < 0 || length > maxPackLength) {
logger.debug ("报文长度 [" + length + "] 超过最大长度:" + maxPackLength
+ "或者小于 0, 清空 buffer");
buf.clear();
break;
//packHeadLength - 2 : 减去包尾的长度,
//length - 2 <= buf.remaining () :代表 length - 本身长度占用的两个字节 - 包头长度
}else if(length >= packHeadLength && length - 4 <= buf.remaining()){
// 读取协议类型 2 个字节
byte[] funcid_byte = new byte[]{buf.get()};
byte[] funcid_byte_arr = new byte[]{0,0,0,0};
//funcid_byte_arr[2] = funcid_byte[0];
funcid_byte_arr[3] = funcid_byte[0];
int funcid = ByteTools.byteArrayToInt(funcid_byte_arr);
logger.warn ("协议类型:" + funcid);
// 读取数据包标识码 8 个字节
byte[] packetIdCode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get()};
long packetIdCode = ByteTools.byteArrayToLong(packetIdCode_byte);
logger.debug ("数据包标识码:" + packetIdCode);
// 读取报文正文内容
int oldLimit = buf.limit();
logger.debug("limit:" + (buf.position() + length));
// 当前读取的位置 + 总长度 - 前面读取的字节长度 - 校验码
buf.limit(buf.position() + length - 19);
String content = buf.getString(ctx.getDecoder());
buf.limit(oldLimit);
logger.debug ("报文正文内容:" + content);
CRC32 crc = new CRC32();
crc.update(content.getBytes("GBK"));
// 读取校验码 4 个字节
byte[] checkcode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get()};
byte[] checkcode_byte_arr = new byte[]{0,0,0,0,0,0,0,0};
checkcode_byte_arr[4] = checkcode_byte[0];
checkcode_byte_arr[5] = checkcode_byte[1];
checkcode_byte_arr[6] = checkcode_byte[2];
checkcode_byte_arr[7] = checkcode_byte[3];
long checkcode = ByteTools.byteArrayToLong(checkcode_byte_arr);
logger.debug ("校验码:" + checkcode);
// 验证校验码
if(checkcode != crc.getValue()){
// 如果消息包不完整,将指针重新移动消息头的起始位置
buf.reset();
break;
}
// 读取包尾 2 个字节
String packtail = new String(new byte[]{buf.get(),buf.get()});
logger.debug ("包尾:" + packtail);
if(!PACK_TAIL.equals(packtail)){
// 如果消息包不完整,将指针重新移动消息头的起始位置
buf.reset();
break;
}
BaseMessageForServer message = new BaseMessageForServer();
message.setLength(length);
message.setCheckCode(checkcode);
message.setFuncid(funcid);
message.setPacketIdCode(packetIdCode);
message.setContent(content);
out.write(message);
}else{
// 如果消息包不完整,将指针重新移动消息头的起始位置
buf.reset();
break;
}
}else{
// 如果消息包不完整,将指针重新移动消息头的起始位置
buf.reset();
break;
}
}
if (buf.hasRemaining()) {
// 将数据移到 buffer 的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
} else {// 如果数据已经处理完毕,进行清空
buf.clear();
}
}
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
// TODO Auto-generated method stub
}
public void dispose(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
// 记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
// 所以,需要上下文拼起来才能完整的处理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0;
private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(3000).setAutoExpand(true);
}
public CharsetDecoder getDecoder() {
return decoder;
}
public IoBuffer getBuffer() {
return buf;
}
@SuppressWarnings("unused")
public int getOverflowPosition() {
return overflowPosition;
}
@SuppressWarnings("unused")
public int getMatchCount() {
return matchCount;
}
@SuppressWarnings("unused")
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
}
@SuppressWarnings("unused")
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
}
public void append(IoBuffer in) {
getBuffer().put(in);
}
}
}
NSMinaHandler—— 处理器,处理业务数据。继承 IoHandlerAdapter 接口,主要重写 messageReceived 方法
public class NSMinaHandler extends IoHandlerAdapter {
private final Logger logger = Logger.getLogger(NSMinaHandler.class);
public static ConcurrentHashMap<Long, IoSession> sessionHashMap = new ConcurrentHashMap<Long, IoSession>();
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
session.closeOnFlush();
logger.error("session occured exception, so close it."
+ cause.getMessage());
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
BaseMessageForServer basemessage = (BaseMessageForServer) message;
logger.debug ("客户端"
+ ((InetSocketAddress) session.getRemoteAddress()).getAddress()
.getHostAddress () + "连接成功!");
session.setAttribute("type", message);
String remoteAddress = ((InetSocketAddress) session.getRemoteAddress())
.getAddress().getHostAddress();
session.setAttribute("ip", remoteAddress);
// 组装消息内容,返回给客户端
BaseMessageForClient messageForClient = new BaseMessageForClient();
messageForClient.setFuncid(2);
if (basemessage.getContent().indexOf("hello") > 0) {
// 内容
messageForClient.setContent ("hello, 我收到您的消息了!");
} else {
// 内容
messageForClient.setContent ("恭喜,您已经入门!");
}
// 校验码生成
CRC32 crc32 = new CRC32();
crc32.update(messageForClient.getContent().getBytes());
//crc 校验码
messageForClient.setCheckCode(crc32.getValue());
// 长度
messageForClient
.setLength(messageForClient.getContent().getBytes().length);
// 数据包标识码
messageForClient.setPacketIdCode(basemessage.getPacketIdCode());
session.write(messageForClient);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.debug("messageSent:" + message);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.debug("remote client [" + session.getRemoteAddress().toString()
+ "] connected.");
Long time = System.currentTimeMillis();
session.setAttribute("id", time);
sessionHashMap.put(time, session);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.debug("sessionClosed");
session.closeOnFlush();
sessionHashMap.remove(session.getAttribute("id"));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.debug("session idle, so disconnecting......");
session.closeOnFlush();
logger.warn("disconnected");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.debug("sessionOpened.");
}
}
还有个就是心跳工厂:
public class NSMinaKeepAliveMessageFactory implements KeepAliveMessageFactory {
private final Logger logger = Logger
.getLogger(NSMinaKeepAliveMessageFactory.class);
private BaseMessageForServer basemessage;
/** 心跳包内容 */
private static long packetIdCode = 0;
/**
* 判断是否心跳请求包 是的话返回 true
*/
public boolean isRequest(IoSession session, Object message) {
// TODO Auto-generated method stub
if (message instanceof BaseMessageForServer) {
basemessage = (BaseMessageForServer) message;
// 心跳包方法协议类型
if (basemessage.getFuncid() == 3) {
// 为 3,代表是一个心跳包,
packetIdCode = basemessage.getPacketIdCode();
return true;
} else {
return false;
}
} else {
return false;
}
}
/**
* 由于被动型心跳机制,没有请求当然也就不关注反馈 因此直接返回 false
*/
public boolean isResponse(IoSession session, Object message) {
// TODO Auto-generated method stub
return false;
}
/**
* 被动型心跳机制无请求 因此直接返回 nul
*/
public Object getRequest(IoSession session) {
// TODO Auto-generated method stub
return null;
}
/**
* 根据心跳请求 request 反回一个心跳反馈消息
*/
public Object getResponse(IoSession session, Object request) {
// 组装消息内容,返回给客户端
BaseMessageForClient messageForClient = new BaseMessageForClient();
messageForClient.setFuncid(4);
// 内容
messageForClient.setContent("2222");
// 校验码生成
CRC32 crc32 = new CRC32();
crc32.update(messageForClient.getContent().getBytes());
//crc 校验码
messageForClient.setCheckCode(crc32.getValue());
// 长度
messageForClient
.setLength(messageForClient.getContent().getBytes().length);
// 数据包标识码
messageForClient.setPacketIdCode(packetIdCode);
return messageForClient;
}
}
到此服务端代码结束。其实服务端与客户端都存在相似之处。编码,解码器都是一样的。客户端启动程序如下:
public class ClientTest {
public static void main(String[] args) {
NioSocketConnector connector = new NioSocketConnector();
// 添加过滤器
connector.getFilterChain().addLast("logger", new LoggingFilter());
// 设置编码,解码过滤器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory()));
//connector.getFilterChain ().addLast ("codec", new ProtocolCodecFilter (new TextLineCodecFactory (Charset.forName ("utf-8"))));// 设置编码过滤器
connector.setHandler (new ClientHandler ());// 设置事件处理器
ConnectFuture cf = connector.connect (new InetSocketAddress ("127.0.0.1",8888)); // 建立连接
cf.awaitUninterruptibly (); // 等待连接创建完成
BaseMessageForServer message = new BaseMessageForServer();
String content = "hello world!";
CRC32 crc = new CRC32();
try {
crc.update(content.getBytes("GBK"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
message.setFuncid(5);
message.setPacketIdCode(10000);
message.setContent(content);
message.setCheckCode(crc.getValue());
message.setLength(content.getBytes().length);
cf.getSession().write(message);
}
}
到此 mina 基本的已完成。
---------------------
原文:https://blog.csdn.net/u012151597/article/details/78847234
apache mina 学习笔记之一:mina简介和环境搭建
Mina简介:
Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版中提供),MINA 所支持的功能也在进一步的扩展中。
Apache Mina经常用作:
1)NIO框架库
2)客户端/服务器通信框架库
3)网络Socket通信库
Apache Mina还伴随有不少子项目:
1)Asyncweb
构建于Apache Mina异步框架之上的HTTP服务器
2)FtpServer
一个FTP服务器
3)SSHd
一个Java库,支持SSHH协议
4)Vysper
一个XMPP服务器
本文将通过一个简单的问候程序 HelloServer 来介绍 MINA 的基础架构的同时演示如何使用MINA 开发网络应用程序。
环境准备
首先到官方网站下载最新的 MINA 版本,地址是:http://mina.apache.org/downloads.html。下载之前先介绍一下 MINA 的两个版本:1.0.x 适合运行环境为 JDK1.4,1.1.x 适合 JDK1.5 的版本,两者的编译环境都需要 JDK1.5。JDK1.5 已经是非常普遍了,本文中使用 1.1.5 版本的 MINA,编译和运行所需的文件是 mina-core-1.1.5.jar。
下载 MINA 的依赖包 slf4j。MINA 使用此项目作为日志信息的输出,而 MINA 本身并不附带此项目包,请到http://www.slf4j.org/download.html地址下载 slf4j 包,slf4j 项目解压后有很多的文件,本例中只需要其中的 slf4j-api-1.4.3.jar 和 slf4j-simple-1.4.3.jar 这两个 jar 文件。如果没有这两个文件就会导致启动例子程序的时候报 org/slf4j/LoggerFactory 类没找到的错误。
当然要求机器上必须装有 1.5 或者更新版本的 JDK。
最好你应该选择一个顺手的 Java 开发环境例如 Eclipse 或者 NetBeans 之类的,可以更方便的编码和调试,虽然我们的最低要求只是一个简单的文本编辑器而已。
项目先决条件:
1、apache mina core 2.09.jar文件
2、Eclipse或MyEclipse或IDEA 编辑器
3、slf4j的jar文件
项目结构:
编写代码并执行
1、编写代码HelloServer.java,如下:
package demo.mina.echo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class HelloServer {
private static final int PORT = 9123;
public static void main(String[] args){
// 监听连接的对象
IoAcceptor acceptor = new NioSocketAcceptor();
// 配置过滤器
// logger过滤器会输出所有的信息,例如新创建的会话、消息的接收、消息的发送、会话的关闭
// codec过滤器会转换二进制活协议规定的数据为消息对象,这里是处理基于文本的消息
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new TextLineCodecFactory(Charset.forName("UTF-8"))));
//
acceptor.setHandler(new TimeServerHandler());
// 设置输入缓冲区的大小和会话的IDLE熟悉
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
try {
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("HelloServer started on port " + PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
* HelloServer 处理逻辑
*
*/
class TimeServerHandler extends IoHandlerAdapter {
<span> </span>//当有异常发生时触发
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
cause.printStackTrace();
session.close();
}
//收到来自客户端的消息
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
//来至客户端信息
String str = message.toString();
//来至客户端ip信息
String ip = session.getRemoteAddress().toString();
System.out.println("===> Message From " + ip +" : " + str);
session.write("Hello,Client " + ip);
//接受客户端字符串"quit"关闭当前会话连接
if(str.trim().equalsIgnoreCase("quit")){
session.close(true);
return;
}
// Calendar time = Calendar.getInstance();
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// session.write(df.format(time.getTime()));
// System.out.println("Time Message written...");
}
//连接被关闭时触发
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
System.out.println("session closed from " + session.getRemoteAddress().toString());
}
//有新连接时触发
@Override
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
System.out.println("session open for " + session.getRemoteAddress());
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
//System.out.println("IDLE "+session.getIdleCount(status));
}
}
2、运行HelloServer
在命令行输入telnet 127.0.0.1 9123
服务器端的输出也可以看到:
————————————————
版权声明:本文为CSDN博主「在奋斗的大道」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zhouzhiwengang/article/details/40562775
关于Netty与Apache MINA的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于Apache MINA、Apache MINA --- [使用JMX来管理MINA应用]、Apache Mina -2、apache mina 学习笔记之一:mina简介和环境搭建的相关信息,请在本站寻找。
本文标签: