本文共 10435 字,大约阅读时间需要 34 分钟。
/** * 注册服务端 * 单线程服务模型,使用标准的阻塞式IO,只有一个线程处理请求 */public static void startSimpleServer(AdditionService.Processor2、TThreadPoolServerprocessor) { try { TServerTransport serverTransport = new TServerSocket(9090);// 设置服务端口 // 单线程服务模型 TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport); tArgs.processor(processor); // 客户端协议要一致 tArgs.protocolFactory(new TBinaryProtocol.Factory()); TServer server = new TSimpleServer(tArgs); System.out.println("Starting the simple server..."); server.serve(); } catch (Exception e) { e.printStackTrace(); }}
/** * 注册服务端 * 线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求 */public static void startMultipleServer(AdditionService.Processor这两个服务端可以处理同样的客户端:processor) { try { TServerTransport serverTransport = new TServerSocket(9090);// 设置服务端口 //线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。 TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport); tArgs.processor(processor); // 客户端协议要一致 tArgs.protocolFactory(new TBinaryProtocol.Factory()); TServer server = new TThreadPoolServer(tArgs); System.out.println("Hello TThreadPoolServer...."); server.serve(); // 启动服务 } catch (Exception e) { e.printStackTrace(); }}
/** * 客户端调用 * 阻塞 * Created by Administrator on 2017/1/12. */public class AdditionClient { public static void main(String[] args) { try { TTransport transport; // 设置传输通道 transport = new TSocket("localhost", 9090);//使用堵塞式I/O进行传输 transport.open(); // 协议要和服务端一致 // 使用二进制协议 TProtocol protocol = new TBinaryProtocol(transport); AdditionService.Client client = new AdditionService.Client(protocol); System.out.println(client.add(100, 200)); transport.close(); } catch (TTransportException e) { e.printStackTrace(); } catch (TException x) { x.printStackTrace(); } }}
/** * TNonblockingServer采用单线程非阻塞(NIO)的模式 * @param processor */public static void nonBlockingServer(AdditionService.Processor查看TNonblockingServer的源码,我们可以看到select()方法:processor) { try { // 传输通道 - 非阻塞方式 TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);// 设置服务端口 // 异步IO,需要使用TFramedTransport,它将分块缓存读取。 TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverTransport); tArgs.processor(processor); tArgs.transportFactory(new TFramedTransport.Factory()); // 使用高密度二进制协议 tArgs.protocolFactory(new TCompactProtocol.Factory()); // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式 TServer server = new TNonblockingServer(tArgs); System.out.println("Starting the simple server..."); server.serve();// 启动服务 } catch (Exception e) { e.printStackTrace(); }}
private void select() { try { this.selector.select(); Iterator e = this.selector.selectedKeys().iterator(); while(!TNonblockingServer.this.stopped_ && e.hasNext()) { SelectionKey key = (SelectionKey)e.next(); e.remove(); if(!key.isValid()) { this.cleanupSelectionKey(key); } else if(key.isAcceptable()) { this.handleAccept(); } else if(key.isReadable()) { this.handleRead(key); } else if(key.isWritable()) { this.handleWrite(key); } else { TNonblockingServer.this.LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException var3) { TNonblockingServer.this.LOGGER.warn("Got an IOException while selecting!", var3); }}
/** * THsHaServer * THsHaServer类是TNonblockingServer类的子类 * @param processor */public static void startTHsHaServer(AdditionService.Processorprocessor) { try { // 传输通道 - 非阻塞方式 TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);// 设置服务端口 // 异步IO,需要使用TFramedTransport,它将分块缓存读取。 THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport); tArgs.processor(processor); tArgs.transportFactory(new TFramedTransport.Factory()); // 使用高密度二进制协议 tArgs.protocolFactory(new TCompactProtocol.Factory()); // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式 TServer server = new THsHaServer(tArgs); System.out.println("Starting the simple server..."); server.serve();// 启动服务 } catch (Exception e) { e.printStackTrace(); }}
/** * TThreadedSelectorServer * 是多线程服务器端使用非堵塞式I/O模型 * @param processor */ public static void startTThreadedSelectorServer(AdditionService.Processor上面三种非阻塞服务模型可以处理这样的客户端:processor) { try { // 传输通道 - 非阻塞方式 TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);// 设置服务端口 // 异步IO,需要使用TFramedTransport,它将分块缓存读取。 TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); tArgs.processor(processor); tArgs.transportFactory(new TFramedTransport.Factory()); // 使用高密度二进制协议 tArgs.protocolFactory(new TCompactProtocol.Factory()); // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式 TServer server = new TThreadedSelectorServer(tArgs); System.out.println("Starting the simple server..."); server.serve();// 启动服务 } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { try { // 设置传输通道,对于非阻塞服务,需要使用TFramedTransport,它将数据分块发送 TSocket socket = new TSocket("localhost", 9090);//使用堵塞式I/O进行传输 //使用非阻塞方式,按块的大小,进行传输,类似于Java中的NIO TTransport transport = new TFramedTransport(socket); // 协议要和服务端一致 // 使用高密度二进制协议 TProtocol protocol = new TCompactProtocol(transport); AdditionService.Client client = new AdditionService.Client(protocol); transport.open(); System.out.println(client.add(100, 100)); transport.close(); } catch (TTransportException e) { e.printStackTrace(); } catch (TException x) { x.printStackTrace(); }}