构建多线程 Web 服务器
- 在 socket 上监听 TCP 连接
- 解析少量的 HTTP 请求
- 创建一个合适的 HTTP 响应
- 使用线程池改进服务器的吞吐量
- 优雅的停机和清理
- 注意:并不是最佳实践
创建项目
~/rust
➜ cargo new helloCreated binary (application) `hello` package~/rust
➜
main.rs 文件
use std::net::TcpListener;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();println!("Connection established!");}
}
修改一:
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
修改二:
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();// 请求// Method Request-URI HTTP-Version CRLF// headers CRLF// message-body// 响应// HTTP-Version Status-Code Reason-Phrase CRLF// headers CRLF// message-bodylet response = "HTTP/1.1 200 OK\r\n\r\n";stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
修改三:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();// 请求// Method Request-URI HTTP-Version CRLF// headers CRLF// message-body// 响应// HTTP-Version Status-Code Reason-Phrase CRLF// headers CRLF// message-bodylet contents = fs::read_to_string("hello.html").unwrap();let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();
}
修改四:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();let get = b"GET / HTTP/1.1\r\n";if buffer.starts_with(get) {let contents = fs::read_to_string("hello.html").unwrap();let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();} else {let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";let contents = fs::read_to_string("404.html").unwrap();let response = format!("{}{}", status_line, contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();}
}
修改五:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();let get = b"GET / HTTP/1.1\r\n";let (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else {("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")};let contents = fs::read_to_string(filename).unwrap();let response = format!("{}{}", status_line, contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();
}
hello.html 文件
<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><title>Hello</title></head><body><h1>Hello</h1><p>Hi from Rust</p></body>
</html>
404.html 文件
<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><title>Hello!</title></head><body><h1>Oops!</h1><p>Sorry, I don't know what you're asking for.</p></body>
</html>
单线程Web服务器
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();let get = b"GET / HTTP/1.1\r\n";let sleep = b"GET /sleep HTTP/1.1\r\n";let (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else if buffer.starts_with(sleep) {thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else {("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")};let contents = fs::read_to_string(filename).unwrap();let response = format!("{}{}", status_line, contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();
}
开启线程
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;fn main() {let listener = TcpListener::bind("localhost:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();thread::spawn(|| {handle_connection(stream);});}
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();let get = b"GET / HTTP/1.1\r\n";let sleep = b"GET /sleep HTTP/1.1\r\n";let (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else if buffer.starts_with(sleep) {thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else {("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")};let contents = fs::read_to_string(filename).unwrap();let response = format!("{}{}", status_line, contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();
}
lib.rs 文件
use std::thread;pub struct ThreadPool {threads: Vec<thread::JoinHandle<()>>,
}impl ThreadPool {/// Creates a new ThreadPool./// /// The size is the number of threads in the pool./// /// # Panics/// /// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let mut threads = Vec::with_capacity(size);for _ in 0..size {// create some threads and store them in the vector}ThreadPool { threads }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{}
}
lib.rs 修改一
use std::thread;pub struct ThreadPool {workers: Vec<Worker>,
}impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id));}ThreadPool { workers }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize) -> Worker {let thread = thread::spawn(|| {});Worker { id, thread }}
}
lib.rs 修改二
use std::thread;
use std::sync::mpsc;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}struct Job;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, receiver));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {let thread = thread::spawn(|| {receiver;});Worker { id, thread }}
}
lib.rs
修改三
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}struct Job;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(|| {receiver;});Worker { id, thread }}
}
lib.rs
修改四
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}// struct Job;
type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("Worker {} got a job; executing.", id);(*job)();});Worker { id, thread }}
}
lib.rs
修改五
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}// struct Job;
// type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}trait FnBox {fn call_box(self: Box<Self>);
}impl<F: FnOnce()> FnBox for F {fn call_box(self: Box<F>) {(*self)()}
}type Job = Box<dyn FnBox + Send + 'static>;impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("Worker {} got a job; executing.", id);// (*job)();job.call_box();});Worker { id, thread }}
}
lib.rs
修改六
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}// struct Job;
// type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}trait FnBox {fn call_box(self: Box<Self>);
}impl<F: FnOnce()> FnBox for F {fn call_box(self: Box<F>) {(*self)()}
}type Job = Box<dyn FnBox + Send + 'static>;impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {while let Ok(job) = receiver.lock().unwrap().recv() {println!("Worker {} got a job; executing.", id);job.call_box();}});Worker { id, thread }}
}
优雅的停机和清理
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}// struct Job;
// type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);worker.thread.join().unwrap()}}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}trait FnBox {fn call_box(self: Box<Self>);
}impl<F: FnOnce()> FnBox for F {fn call_box(self: Box<F>) {(*self)()}
}type Job = Box<dyn FnBox + Send + 'static>;impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {while let Ok(job) = receiver.lock().unwrap().recv() {println!("Worker {} got a job; executing.", id);job.call_box();}});Worker { id, thread }}
}
修改优化一:
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}// struct Job;
// type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,
}trait FnBox {fn call_box(self: Box<Self>);
}impl<F: FnOnce()> FnBox for F {fn call_box(self: Box<F>) {(*self)()}
}type Job = Box<dyn FnBox + Send + 'static>;impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {while let Ok(job) = receiver.lock().unwrap().recv() {println!("Worker {} got a job; executing.", id);job.call_box();}});Worker {id,thread: Some(thread),}}
}
最终版 lib.rs 文件
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;enum Message {NewJob(Job),Terminate,
}pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Message>,
}// struct Job;
// type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {/// Creates a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The `new` function will panic if the size is zero.pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(Message::NewJob(job)).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {println!("Sending terminate message to all workers.");for _ in &mut self.workers {self.sender.send(Message::Terminate).unwrap();}println!("Shutting down all workers.");for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,
}trait FnBox {fn call_box(self: Box<Self>);
}impl<F: FnOnce()> FnBox for F {fn call_box(self: Box<F>) {(*self)()}
}type Job = Box<dyn FnBox + Send + 'static>;impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {let thread = thread::spawn(move || loop {let message = receiver.lock().unwrap().recv().unwrap();match message {Message::NewJob(job) => {println!("Worker {} got a job; executing.", id);job.call_box();}Message::Terminate => {println!("Worker {} got a job; executing.", id);break;}}});Worker {id,thread: Some(thread),}}
}
最终版 main.rs 文件
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;fn main() {let listener = TcpListener::bind("localhost:7878").unwrap();let pool = ThreadPool::new(4);for stream in listener.incoming().take(2) {let stream = stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!("Shutting down.");
}fn handle_connection(mut stream: TcpStream) {let mut buffer = [0; 512];stream.read(&mut buffer).unwrap();let get = b"GET / HTTP/1.1\r\n";let sleep = b"GET /sleep HTTP/1.1\r\n";let (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else if buffer.starts_with(sleep) {thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else {("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")};let contents = fs::read_to_string(filename).unwrap();let response = format!("{}{}", status_line, contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();
}
运行
hello on master [?] is 📦 0.1.0 via 🦀 1.67.1
➜ cargo run Compiling hello v0.1.0 (/Users/qiaopengjun/rust/hello)Finished dev [unoptimized + debuginfo] target(s) in 0.43sRunning `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 got a job; executing.
Worker 2 got a job; executing.
Worker 3 got a job; executing.
Worker 1 got a job; executing.
Worker 0 got a job; executing.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3hello on master [?] is 📦 0.1.0 via 🦀 1.67.1 took 21.9s
➜