【Java】聊天程序综合案例
创建服务端
在类中添加消息队列及Socket集合
因为需要给所有客户端发送消息,所以服务器端必须持有所有客户端Socket的集合
生产和消费消息数据需要一个消息队列,所以服务器还必须定义一个消息队列
package edu.xalead.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class chatServer {
/**
* 所有客户端连接集合
*/
private ConcurrentHashMap<String, Socket> allCustomer = new ConcurrentHashMap<>();
/***
* 存放消息的队列
*/
private ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
}
创建接收线程
离开ChatServer类没有利用价值,所以我这里写成内部类
/**
* 创建接受线程
* 离开ChatServer后失去价值,故使用内部类
* 作用:只做接受消息,放进消息队列
*/
private class ReceiveService extends Thread{
/**
* 必须持有消息队列的引用
*/
//如果需要创建效果更强的外部类则对于消息队列需要创建接口
// private ConcurrentLinkedQueue<String> messageQueue=null;
// private ReceiveService(ConcurrentLinkedQueue<String> messageQueue){
// this.messageQueue=messageQueue;
// }
//客户端的套接字
private Socket client=null;
public ReceiveService(Socket client) {
this.client = client;
}
public void run(){
BufferedReader br=null;
try {
//注意socket只能得到字节流,所以要把它包装成字符流得用InputStreamReader包装一下
br=new BufferedReader(
new InputStreamReader(client.getInputStream()));
while (true) {
//接收消息
System.out.println("等待接收客户端【"+client.getInetAddress().getHostAddress()
+"】消息");
String mesg=br.readLine();
System.out.println("接收到客户端【"+client.getInetAddress().getHostAddress()
+"】消息");
//放入消息队列
synchronized (messageQueue) {
messageQueue.offer(mesg);
messageQueue.notify();
}
//接受下一条
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
.接收客户消息
每个接收线程只能为一个特定客户服务,必须持有这个客户的Socket,所以在接收线程中添加客户的 Socket引用
//客户端的套接字
private Socket client=null;
public ReceiveService(Socket client) {
this.client = client;
}
接收线程中的Socket怎么得到呢?
显然,需要编写监听客户端的代码吧 4.添加监听客户端连接的代码
private static final int port=9999;
/**
* 监听
*/
public void start(){
//启动发送消息线程
new Thread(new SendService()).start();
ServerSocket serverSocket=null;
Socket client=null;
try {
//申请端口
serverSocket =new ServerSocket(port);
while (true) {
//监听
System.out.println("开始监听新的客户端连接 。。。。");
client=serverSocket.accept();
System.out.println("监听到客户端【"+client.getInetAddress().getHostAddress()
+":"+client.getPort()+"】");
//提供消息服务
new ReceiveService(client).start();
//把socket放进客户socket集合,以便发送线程使用
String key=client.getInetAddress().getHostAddress()+":"+client.getPort();
System.out.println(key);
allCustomer.put(key,client);
//监听下一个
}
} catch (Exception e) {
e.printStackTrace();
}
}
定义发送线程
/**
* 创建发送线程
*/
private class SendService implements Runnable{
@Override
public void run() {
try {
PrintWriter pw=null;
while (true) {
//取消息队列中的消息
String mesg=messageQueue.poll();//poll取一个删一个
synchronized (messageQueue) {
if(mesg!=null) {
//遍历客户端连接
for (Socket socket : allCustomer.values()) {
//创建字符输出流半配网络字节流
pw = new PrintWriter(socket.getOutputStream());
//向客户端发送消息
pw.println(mesg);
pw.flush();
}
}else {
//休息
messageQueue.wait();
}
}
}
//到队列里取下一条消息
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动发送消息线程
完整服务端代码
package edu.xalead.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class chatServer {
private static final int port=9999;
/**
* 监听
*/
public void start(){
//启动发送消息线程
new Thread(new SendService()).start();
ServerSocket serverSocket=null;
Socket client=null;
try {
//申请端口
serverSocket =new ServerSocket(port);
while (true) {
//监听
System.out.println("开始监听新的客户端连接 。。。。");
client=serverSocket.accept();
System.out.println("监听到客户端【"+client.getInetAddress().getHostAddress()
+":"+client.getPort()+"】");
//提供消息服务
new ReceiveService(client).start();
//把socket放进客户socket集合,以便发送线程使用
String key=client.getInetAddress().getHostAddress()+":"+client.getPort();
System.out.println(key);
allCustomer.put(key,client);
//监听下一个
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 所有客户端连接集合
*/
private ConcurrentHashMap<String, Socket> allCustomer = new ConcurrentHashMap<>();
/***
* 存放消息的队列
*/
private ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
/**
* 创建发送线程
*/
private class SendService implements Runnable{
@Override
public void run() {
try {
PrintWriter pw=null;
while (true) {
//取消息队列中的消息
String mesg=messageQueue.poll();//poll取一个删一个
synchronized (messageQueue) {
if(mesg!=null) {
//遍历客户端连接
for (Socket socket : allCustomer.values()) {
//创建字符输出流半配网络字节流
pw = new PrintWriter(socket.getOutputStream());
//向客户端发送消息
pw.println(mesg);
pw.flush();
}
}else {
//休息
messageQueue.wait();
}
}
}
//到队列里取下一条消息
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 创建接受线程
* 离开ChatServer后失去价值,故使用内部类
* 作用:只做接受消息,放进消息队列
*/
private class ReceiveService extends Thread{
/**
* 必须持有消息队列的引用
*/
//如果需要创建效果更强的外部类则对于消息队列需要创建接口
// private ConcurrentLinkedQueue<String> messageQueue=null;
// private ReceiveService(ConcurrentLinkedQueue<String> messageQueue){
// this.messageQueue=messageQueue;
// }
//客户端的套接字
private Socket client=null;
public ReceiveService(Socket client) {
this.client = client;
}
public void run(){
BufferedReader br=null;
try {
//注意socket只能得到字节流,所以要把它包装成字符流得用InputStreamReader包装一下
br=new BufferedReader(
new InputStreamReader(client.getInputStream()));
while (true) {
//接收消息
System.out.println("等待接收客户端【"+client.getInetAddress().getHostAddress()
+"】消息");
String mesg=br.readLine();
System.out.println("接收到客户端【"+client.getInetAddress().getHostAddress()
+"】消息");
//放入消息队列
synchronized (messageQueue) {
messageQueue.offer(mesg);
messageQueue.notify();
}
//接受下一条
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
思考线程协作的问题
如果不考虑线程协作,那么发送消息线程在消息队列为空的时候仍然会做无意义循环,浪费宝贵的CPU 时间片
所以我们要用线程协作解决这个问题。首先要添加同步块,因为消息队列是所有线程监控的同一对象, 所以用它作为同步监视器
切记要注意同步块的范围,如果同步锁定紫色框选范围,则只要有一个线程br.readLine()会等待客户消 息,导致所有接收消息的线程无法进入同步块,无法执行接收消息的工作
最后添加协作代码
当消息队列为空时,发送线程进入休眠状态
当接收消息线程接收到消息并放入消息队列,则唤醒发送线程
.我们准备把传输数据改为json传输
创建VO对象
package edu.xalead.vo;
import java.util.Date;
public class MessageVO {
//vo(view object)
private String mesg;
private Date date;
public MessageVO(){
}
public MessageVO(String mesg, Date date) {
this.mesg = mesg;
this.date = date;
}
@Override
public String toString() {
return "MessageVO{" +
"mesg='" + mesg + '\'' +
", date=" + date +
'}';
}
public String getMesg() {
return mesg;
}
public void setMesg(String mesg) {
this.mesg = mesg;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
}
创建JSON和对象互转工具类
package edu.xalead.util;
import net.sf.json.JSONObject;
public class JSONUtil {
/**
* 对象转json的方法
* @return
*/
public static String obj2json(Object obj){
JSONObject ob=JSONObject.fromObject(obj);
return ob.toString();
}
/**
* 把json串转成对象的方法
*/
public static <T> T json2obj(String jsonStr,Class<T> t){
JSONObject object=JSONObject.fromObject(jsonStr);
return (T)JSONObject.toBean(object,t);
}
}
______
测试代码
package test.edu.xalead;
import edu.xalead.util.JSONUtil;
import net.sf.json.JSONObject;
public class TestJSONUtil {
@org.junit.Test
public void test1(){
//创建学生对象
Student s=new Student();
s.setNo(222);
s.setAge(20);
s.setName("zhansgan");
Address adr=new Address();
adr.setHomeadr("未央区");
adr.setSchooladr("碑林区");
s.setAddress(adr);
System.out.println(JSONUtil.obj2json(s));
String jsonStr="{\"address\":{\"homeadr\":\"央区\",\"schooladr\":\"碑林区\"},\"age\":20,\"name\":\"zhansgan\",\"no\":22}";
Student ss=JSONUtil.json2obj(jsonStr,Student.class);
System.out.println(ss);
}
}
编译结果
编写服务端启动类
package edu.xalead.server;
public class ServerStart {
public static void main(String[] args) {
new chatServer().start();
}
}
写客户端
客户端知道服务器的地址和端口,先编写客户端类直连服务
package edu.xalead.client;
import edu.xalead.util.JSONUtil;
import edu.xalead.vo.MessageVO;
import javax.xml.crypto.Data;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;
import java.util.Scanner;
public class ChatClient {
/**
* 服务器地址
*/
private String addr="127.0.0.1";
Socket s=null;
/**
* 聊天服务端口
*/
private int port=9999;
public void start(){
try {
//客户知道服务器的地址和端口,直接创建套接字
s=new Socket(addr,port);
//启动两个监听服务线程
new ReceiveService().start();
new SendService().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端要做两件事:
1.监听服务器返回的消息,并输出到控制台
2.监听键盘消息,并发向服务器 很显然,这里需要两个客户线程
创建客户端接收线程
监听服务器返回的消息,并输出到控制台,因为离开客户端没有复用价值,所以我们也是写成 ChatClient类的内部类
/**
* 创建监听键盘消息
*/
private class SendService extends Thread{
private PrintWriter pw=null;
public void run(){
try {
while (true) {
Scanner scanner =new Scanner(System.in);
//接受键盘消息
String mesg=scanner.nextLine();
//封装MessageVO
MessageVO vo= new MessageVO(mesg, new Date());
//解析成json串
String jsonStr=JSONUtil.obj2json(vo);
//发送到服务器
pw=new PrintWriter(s.getOutputStream());
pw.println(jsonStr);
pw.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
监听键盘消息,并发向服务器
/**
* 创建监听服务器消息线程
*/
private class ReceiveService extends Thread{
private BufferedReader br=null;
public void run(){
try {
while (true) {
br=new BufferedReader(
new InputStreamReader(s.getInputStream()));
//监听服务器发送过来的json字符串
String jsonStr =br.readLine();
//json串转换成对象
MessageVO mvo= JSONUtil.json2obj(jsonStr,MessageVO.class);
//在控制台输出
System.out.println("info:"+mvo.getMesg()+"【时间:"+mvo.getDate()+"】");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
最后,客户端要启动两个服务线程
两个监听线程均依赖网络套接字,所以启动线程的代码写在创建套接后就可以
最后,编写客户端的启动类
package edu.xalead.client;
public class ClientStart {
public static void main(String[] args) {
new ChatClient().start();
}
}
结果
还没有评论,来说两句吧...