From 6fe2f12b8e16517a667536295d0f2aa78ae8ee76 Mon Sep 17 00:00:00 2001 From: Fasterino Date: Fri, 31 Oct 2025 04:27:42 +0300 Subject: [PATCH] Release 0.1.0 --- .dockerignore | 3 + .gitignore | 3 + Cargo.toml | 37 +++ Dockerfile | 16 + README.md | 110 +++++++ docker-compose.yml | 13 + icon.svg | 26 ++ schema.sql | 13 + src/check/mod.rs | 204 +++++++++++++ src/context.rs | 63 ++++ src/db.rs | 32 ++ src/docker/bollard.rs | 133 +++++++++ src/docker/mod.rs | 204 +++++++++++++ src/lib.rs | 18 ++ src/logger.rs | 25 ++ src/main.rs | 3 + src/models.rs | 37 +++ src/prelude.rs | 30 ++ src/runner/mod.rs | 48 +++ src/runner/tokio.rs | 32 ++ src/server/actix_listener.rs | 297 +++++++++++++++++++ src/server/mod.rs | 245 +++++++++++++++ src/server/reqwest_proxy.rs | 56 ++++ src/server/tokio_tungstenite_socket_proxy.rs | 93 ++++++ src/service.rs | 24 ++ src/settings.rs | 36 +++ src/test/mod.rs | 24 ++ src/tls/certbot_updater.rs | 53 ++++ src/tls/mod.rs | 133 +++++++++ src/tls/ssl_expiration_checker.rs | 20 ++ 30 files changed, 2031 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 docker-compose.yml create mode 100644 icon.svg create mode 100644 schema.sql create mode 100644 src/check/mod.rs create mode 100644 src/context.rs create mode 100644 src/db.rs create mode 100644 src/docker/bollard.rs create mode 100644 src/docker/mod.rs create mode 100644 src/lib.rs create mode 100644 src/logger.rs create mode 100644 src/main.rs create mode 100644 src/models.rs create mode 100644 src/prelude.rs create mode 100644 src/runner/mod.rs create mode 100644 src/runner/tokio.rs create mode 100644 src/server/actix_listener.rs create mode 100644 src/server/mod.rs create mode 100644 src/server/reqwest_proxy.rs create mode 100644 src/server/tokio_tungstenite_socket_proxy.rs create mode 100644 src/service.rs create mode 100644 src/settings.rs create mode 100644 src/test/mod.rs create mode 100644 src/tls/certbot_updater.rs create mode 100644 src/tls/mod.rs create mode 100644 src/tls/ssl_expiration_checker.rs diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2ede84f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +/target +/test* +Cargo.lock \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2ede84f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +/test* +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..5ad4a6e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "rrpsfd" +version = "0.1.0" +edition = "2024" + +[dependencies] +# Runner +tokio = { version = "1.48.0", features = ["full"] } + +# Logger +chrono = "0.4.42" + +# Database +ormlite = { version = "0.24.1", features = ["sqlite"] } +sqlx = "0.8.6" + +# Docker +bollard = "0.19.3" +futures-util = "0.3.31" +async-trait = "0.1.89" + +# Server [Listener] +actix-web = {version="4.11.0", features = ["openssl"]} +actix-ws = "0.3.0" +actix-http = { version = "3.11.2", features = ["ws"] } +openssl = "0.10.74" + +# Server [Proxy] +reqwest = "0.12.24" +bytes = "1.10.1" + +# Server [Socket Proxy] + +tokio-tungstenite = "0.28.0" + +# Tls +ssl-expiration2 = "0.4.0" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d3805e1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM rust:bullseye AS builder +WORKDIR /usr/src/rrpsfd +COPY Cargo.toml . +RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build --release && rm -rf src +COPY . . +RUN cargo install --path . + +FROM debian:bullseye-slim +WORKDIR /home +RUN apt-get update && apt-get install -y certbot && rm -rf /var/lib/apt/lists/* +COPY --from=builder /usr/local/cargo/bin/rrpsfd /usr/local/bin/rrpsfd +COPY schema.sql . +VOLUME [ "/var/run/docker.sock", "/home/data", "/etc/letsencrypt/" ] +EXPOSE 80/tcp +EXPOSE 443/tcp +CMD ["rrpsfd", "--app"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..adc2fff --- /dev/null +++ b/README.md @@ -0,0 +1,110 @@ +![RRPSFD](./icon.svg "Optional title for the image") +> The easiest way to split network traffic by domain +> +> RRPSFD stands for Rust Reverse Proxy - Special for Docker +> +> .. contains 99% pure rust (1% is a certbot dependency) .. + +## About + +RRPSFD is a lightweight and efficient reverse proxy that allows you to split network traffic based on domain names. It is designed to be easy to use and configure, making it the perfect solution for anyone looking to link their docker containers to specific domains. + +## Quick Start + +Create `docker-compose.yml` file with the following content: + +```yaml +services: + rrpsfd: + image: git.3spikes.space/docker/rrpsfd:latest + container_name: rrpsfd + network_mode: "host" + restart: always + environment: + EMAIL: # Type your email here + volumes: + # - "/var/run/docker.sock:/var/run/docker.sock" # Uncomment this if you use Docker in root mode. + # - "~/.docker/run/docker.sock:/var/run/docker.sock" # Uncomment this if you use Docker in user mode (like Docker Desktop). + - "./rrpsfd/data:/home/data" + - "./rrpsfd/letsencrypt:/etc/letsencrypt/" +``` + +> !!! Don't forget to replace `# Type your email here` with your actual email address and uncomment the volume mount line with the correct path. + +Run `docker compose up -d` or `docker-compose up -d` + +Check logs with `docker compose logs rrpsfd` and if no errors, proceed to the next step. + +## Adding Services (docker compose) + +In `docker-compose.yml` file, add labels for your service with format: + +```yaml + labels: + rrpsfd:domain: port of your application +``` + +### Example: + +You have service: + +```yaml +services: + test: + image: your-image:latest + ports: + - "8080:3000" +``` + +And you want to link it with `example.com` and `subdomain.other-example.com` domains. Just remake the service with the following labels: + +```yaml +services: + test: + image: your-image:latest + labels: + rrpsfd:example.com: 3000 + rrpsfd:subdomain.other-example.com: 3000 +``` + +Run `docker compose up -d` and your service will be available at `example.com` and `subdomain.other-example.com`. SSL certificates will be automatically generated and renewed. + +I'm using this project to manage my services. +For example, I have [Minesweeper](https://git.3spikes.space/ROFLS/minesweeper-py) that have next `docker-compose.yml` file: + +```yaml +services: + main: + build: . + restart: unless-stopped + labels: + rrpsfd:ms.3spikes.space: 80 # link to rrpsfd instead of ports: - "80:80" +``` + +## CLI + +If you want to manage your services from the command line, you can use the `docker exec -it rrpsfd rrpsfd`. It provides a simple interface to manage your services, including adding, removing, and updating services (Not only those running in Docker). + +If you just want to check connected services, run previous command without `-it` (`docker exec rrpsfd rrpsfd`). + +## Features + +- Easy to use and configure +- Lightweight and efficient +- Generates SSL certificates automatically +- Automatic and manual proxy management +- Supports websockets +- Forkable, it's possible because it's written using Service pattern, add your own services to the RRPSFD by implementing the `Service` trait. + + +# Roadmap + +- [x] Http 1 Reverse proxy +- [x] Docker integration +- [x] CLI tools +- [x] Letsencrypt integration +- [x] WebSockets support +- [ ] Hostname override feature +- [ ] Http 2 Reverse proxy +- [ ] Http 3 Reverse proxy +- [ ] Custom Protocols Reverse Proxy diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5257794 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +services: + rrpsfd: + build: . + container_name: rrpsfd + network_mode: "host" + restart: always + environment: + EMAIL: # Type your email here + volumes: + # - "/var/run/docker.sock:/var/run/docker.sock" # Uncomment this if you use Docker in root mode. + # - "~/.docker/run/docker.sock:/var/run/docker.sock" # Uncomment this if you use Docker in user mode (like Docker Desktop). + - "./rrpsfd/data:/home/data" + - "./rrpsfd/letsencrypt:/etc/letsencrypt/" diff --git a/icon.svg b/icon.svg new file mode 100644 index 0000000..ace6a61 --- /dev/null +++ b/icon.svg @@ -0,0 +1,26 @@ + + + + RRPSFD + + + + + + + + + + + + + \ No newline at end of file diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..b1a4918 --- /dev/null +++ b/schema.sql @@ -0,0 +1,13 @@ + +CREATE TABLE IF NOT EXISTS connection ( + domain TEXT PRIMARY KEY NOT NULL, + protocol TEXT NOT NULL, + ip_address TEXT NOT NULL, + port INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS docker_container ( + id TEXT PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + ip_address TEXT +); diff --git a/src/check/mod.rs b/src/check/mod.rs new file mode 100644 index 0000000..80ff71e --- /dev/null +++ b/src/check/mod.rs @@ -0,0 +1,204 @@ +use std::{ + fmt::Display, + io::{IsTerminal, Stdin, stdin}, + sync::Arc, +}; + +use ormlite::Model; +use sqlx::SqliteConnection; + +use crate::{ + context::Context, + log, log_err, log_warn, + models::{Connection, DockerContainer}, + service::Service, + settings::{AsyncMutexGuard, Result}, +}; + +pub struct CheckService; + +#[async_trait::async_trait] +impl Service for CheckService { + fn new() -> Arc + where + Self: Sized, + { + Self.into() + } + + async fn run_inner(self: Arc, context: Context) -> Result<()> { + let stdio = stdin(); + let is_terminal = stdio.is_terminal(); + loop { + let mut conn = context.db().await; + log!("Loading database..."); + + log!("Getting docker containers..."); + let containers = DockerContainer::select().fetch_all(&mut *conn).await?; + let containers_str = containers + .iter() + .map(|x| x.to_string()) + .collect::>() + .join("\n"); + log!( + "Found {} containers\n{}\n", + containers.len(), + containers_str + ); + + log!("Getting connections..."); + let connections = Connection::select().fetch_all(&mut *conn).await?; + let connections_str = connections + .iter() + .map(|x| x.to_string()) + .collect::>() + .join("\n"); + log!( + "Found {} connections\n{}\n", + connections.len(), + connections_str + ); + + if !is_terminal { + log!( + "If you want to add or remove connections run command:\ndocker exec -it rrpsfd" + ); + break; + } + + match CheckCommand::poll(&stdio, conn).await { + Ok(true) => break, + Ok(false) => continue, + Err(e) => { + log_err!("{e}\n\n") + } + } + } + + Ok(()) + } + + fn service_name(&self) -> &'static str { + "Test" + } +} + +enum CheckCommand { + AddConnection, + RemoveConnection, + Exit, +} + +impl Display for CheckCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CheckCommand::AddConnection => { + f.write_str("add ://: - Add conection with specified domain and link") + } + CheckCommand::RemoveConnection => f.write_str("remove - Remove conection with specified domain"), + CheckCommand::Exit => f.write_str("exit - Exit from service mode>"), + } + } +} + +impl CheckCommand { + pub async fn poll(stdio: &Stdin, conn: AsyncMutexGuard<'_, SqliteConnection>) -> Result { + log!( + "Enter one of this command:\n{}\n{}\n{}\n", + Self::AddConnection, + Self::RemoveConnection, + Self::Exit + ); + let mut input = String::new(); + stdio.read_line(&mut input)?; + let mut parts = input.trim().split(' '); + let command = + Self::get_command(parts.next().ok_or("No input!")?).ok_or("Wrong command!")?; + + match command { + CheckCommand::AddConnection => { + let domain = parts.next().ok_or("No domain specified!")?; + let link = parts.next().ok_or("No link specified!")?; + let (protocol, parts) = link.split_once("://").ok_or("Wrong link format!")?; + if protocol != "http" && protocol != "https" { + return Err("Supported protocols: http, https".into()); + } + let (host, port) = parts.split_once(":").ok_or("Wrong link format!")?; + let port: u16 = port.parse().map_err(|_| "Wrong link format!")?; + + Self::add_connection( + conn, + domain.to_owned(), + protocol.to_owned(), + host.to_owned(), + port, + ) + .await?; + log!( + "Ok! The SSL certificate will be ready in an hour. If you don't want to wait, restart the container.\n\n" + ); + Ok(false) + } + CheckCommand::RemoveConnection => { + let domain = parts.next().ok_or("No domain specified!")?; + + Self::remove_connection(stdio, conn, domain.to_owned()).await?; + log!("Ok!\n\n"); + Ok(false) + } + CheckCommand::Exit => Ok(true), + } + } + + fn get_command(command: &str) -> Option { + match command { + "add" => Some(Self::AddConnection), + "remove" => Some(Self::RemoveConnection), + "exit" => Some(Self::Exit), + _ => None, + } + } + + async fn add_connection( + mut conn: AsyncMutexGuard<'_, SqliteConnection>, + domain: String, + protocol: String, + host: String, + port: u16, + ) -> Result<()> { + Connection { + domain, + protocol, + ip_address: host, + port, + } + .insert(&mut *conn) + .await?; + Ok(()) + } + async fn remove_connection( + stdio: &Stdin, + mut conn: AsyncMutexGuard<'_, SqliteConnection>, + domain: String, + ) -> Result<()> { + let connection = Connection::fetch_one(domain, &mut *conn).await?; + if let Ok(container) = DockerContainer::select() + .where_("ip_address = ?") + .bind(&connection.ip_address) + .fetch_one(&mut *conn) + .await + { + log_warn!( + "This connection is part of container with name {}, type 'confirm' if you want to delete this connection:", + container.name + ); + let mut confirm = String::new(); + stdio.read_line(&mut confirm)?; + if !confirm.trim().eq_ignore_ascii_case("confirm") { + return Err("Deletion was rejected".into()); + } + } + connection.delete(&mut *conn).await?; + Ok(()) + } +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..8099572 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,63 @@ +use std::{collections::HashMap, sync::Arc}; + +use ormlite::{Model, sqlite::SqliteConnection}; + +use crate::{ + db::Db, + log_err, + models::Connection, + settings::{AsyncMutex, AsyncMutexGuard, Result}, +}; + +#[async_trait::async_trait] +pub trait Event: Send + Sync { + async fn invoke(&self, context: &Context) -> Result<()>; +} + +#[derive(Clone)] +pub struct Context { + db: Arc>, + certificates: Arc>>, + on_update_ssl: Arc>>>, +} +impl Context { + pub async fn new() -> Self { + Context { + db: Db::connect().await, + certificates: AsyncMutex::new(HashMap::new()).into(), + on_update_ssl: AsyncMutex::new(None).into(), + } + } + + pub async fn db(&'_ self) -> AsyncMutexGuard<'_, SqliteConnection> { + self.db.lock().await + } + + pub async fn get_connection(&'_ self, domain: &str) -> Option { + let mut conn = self.db().await; + Connection::fetch_one(domain, &mut *conn).await.ok() + } + + pub async fn on_update_ssl(&self, event: Arc) { + let mut on_update_ssl = self.on_update_ssl.lock().await; + *on_update_ssl = Some(event.clone()); + } + + pub async fn update_ssl(&self) { + let on_update_ssl = self.on_update_ssl.lock().await; + if let Some(event) = on_update_ssl.clone() { + if let Err(err) = event.invoke(self).await { + log_err!("Failed to update SSL: {}", err); + } + } + } + pub async fn add_cert(&self, domain: String, cert: (String, String)) { + let mut certificates = self.certificates.lock().await; + certificates.insert(domain, cert); + } + + pub fn get_cert(&self, domain: &str) -> Option<(String, String)> { + let certificates = self.certificates.try_lock().ok()?; + certificates.get(domain).cloned() + } +} diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..7500a0e --- /dev/null +++ b/src/db.rs @@ -0,0 +1,32 @@ +use std::{fs, sync::Arc}; + +use ormlite::{ + Connection, + sqlite::{SqliteConnectOptions, SqliteConnection}, +}; + +use crate::settings::AsyncMutex; + +pub struct Db; + +const DB_PATH: &str = "./data/rs.db"; + +impl Db { + pub async fn connect() -> Arc> { + let options = SqliteConnectOptions::new() + .filename(DB_PATH) + .create_if_missing(true); + let mut db = SqliteConnection::connect_with(&options) + .await + .expect("Cannot connect to db"); + + let sql = fs::read_to_string("./schema.sql").expect("Cannot read schema file"); + + sqlx::query(&sql) + .execute(&mut db) + .await + .expect("Cannot initialize database"); + + AsyncMutex::new(db).into() + } +} diff --git a/src/docker/bollard.rs b/src/docker/bollard.rs new file mode 100644 index 0000000..f5737e5 --- /dev/null +++ b/src/docker/bollard.rs @@ -0,0 +1,133 @@ +use std::{pin::Pin, sync::Arc}; + +use bollard::{ + Docker, + errors::Error, + query_parameters::{EventsOptionsBuilder, InspectContainerOptions}, + secret::{ContainerInspectResponse, EventMessage}, +}; +use futures_util::{Stream, StreamExt}; + +use crate::settings::Result; + +use super::{ + DockerContainer, DockerContainerLables, DockerServiceEvent, DockerServiceEvents, + DockerServiceInner, +}; + +pub struct BollardDocker { + docker: Docker, +} + +#[async_trait::async_trait] +impl<'a> DockerServiceInner for BollardDocker { + type Events = BollardDockerEvents; + + async fn new() -> Arc + where + Self: Sized, + { + Self { + docker: Docker::connect_with_local_defaults().expect("Failed to connect to Docker"), + } + .into() + } + + fn events(&self) -> Self::Events { + Self::Events::new(Box::pin( + self.docker + .events(Some(EventsOptionsBuilder::new().build())), + )) + } +} + +impl BollardDocker { + pub async fn parse(&self, event: EventMessage) -> DockerServiceEvent { + if event.typ != Some(bollard::secret::EventMessageTypeEnum::CONTAINER) { + return DockerServiceEvent::Other; + } + + let Some(action) = event.action.as_deref() else { + return DockerServiceEvent::Other; + }; + + let Some(id) = event.actor.and_then(|x| x.id) else { + return DockerServiceEvent::Other; + }; + + let Some((container, lables)) = self + .get_container(&id) + .await + .ok() + .and_then(|x| self.parse_container_info(x)) + else { + return DockerServiceEvent::Other; + }; + + match action { + "start" => DockerServiceEvent::ContainerStarted(container, lables), + "stop" => DockerServiceEvent::ContainerStopped(container, lables), + _ => DockerServiceEvent::Other, + } + } + + async fn get_container(&self, container_id: &str) -> Result { + Ok(self + .docker + .inspect_container(container_id, None::) + .await?) + } + + fn parse_container_info( + &self, + container: ContainerInspectResponse, + ) -> Option<(DockerContainer, DockerContainerLables)> { + let id = container.id?; + let name = container.name?.trim_start_matches('/').to_owned(); + let lables = container.config?.labels?; + let ip_address = container + .network_settings? + .networks? + .values() + .next()? + .ip_address + .clone() + .and_then(|x| if x.is_empty() { None } else { Some(x) }); + + Some(( + DockerContainer { + id, + name, + ip_address, + }, + lables, + )) + } +} + +pub struct BollardDockerEvents { + stream: Pin> + Send>>, +} + +impl BollardDockerEvents { + pub fn new( + stream: Pin> + Send>>, + ) -> Self { + Self { stream } + } + + async fn next_inner(&mut self, service: Arc) -> Result { + match self.stream.next().await { + Some(Ok(event)) => Ok(service.parse(event).await), + Some(Err(e)) => Err(e.into()), + None => Err("No more events".into()), + } + } +} + +#[async_trait::async_trait] +impl DockerServiceEvents for BollardDockerEvents { + async fn next(&mut self, service: Arc) -> Result { + self.next_inner(service).await + } +} diff --git a/src/docker/mod.rs b/src/docker/mod.rs new file mode 100644 index 0000000..7c396b4 --- /dev/null +++ b/src/docker/mod.rs @@ -0,0 +1,204 @@ +pub mod bollard; + +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; + +use ormlite::Model as _; + +use crate::{ + context::Context, + log, log_err, + models::{Connection, DockerContainer}, + service::Service, + settings::Result, +}; + +pub struct DockerService(PhantomData) +where + T: DockerServiceInner; + +#[async_trait::async_trait] +impl Service for DockerService +where + T: DockerServiceInner, +{ + fn new() -> Arc { + Self(PhantomData).into() + } + + async fn run_inner(self: Arc, context: Context) -> Result<()> { + let inner = T::new().await; + let mut events = inner.events(); + loop { + let event = events.next(inner.clone()).await; + match event { + Ok(event) => self.handle_event(&context, event).await, + Err(err) => { + return Err(err); + } + } + } + } + + fn service_name(&self) -> &'static str { + "Docker" + } +} + +impl DockerService +where + T: DockerServiceInner, +{ + async fn handle_event(&self, context: &Context, event: DockerServiceEvent) { + if let Err(e) = match event { + DockerServiceEvent::ContainerStarted(container, lables) => { + if let Some(ip) = container.ip_address.as_deref() { + let connections = self.parse_connections(ip, lables); + if connections.is_empty() { + return; + } + + self.handle_container_start(context, container, connections) + .await + } else { + Err("Container has no IP address".into()) + } + } + DockerServiceEvent::ContainerStopped(container, lables) => { + let connections = self.parse_connections("", lables); + if connections.is_empty() { + return; + } + + self.handle_container_stop(context, container, connections) + .await + } + _ => Ok(()), + } { + log_err!("Docker event handling failed: {}", e); + } + } + + async fn handle_container_start( + &self, + context: &Context, + container: DockerContainer, + connections: Vec, + ) -> Result<()> { + let Some(ip) = container.ip_address.as_deref() else { + return Err("Container has no IP address".into()); + }; + + log!( + "Container started: {} ({}), ip: {}", + container.name, + container.id, + ip + ); + + let name = container.name.clone(); + + let mut conn = context.db().await; + container.insert(&mut *conn).await?; + let mut len = 0; + for connection in connections { + let domain = connection.domain.clone(); + let port = connection.port; + if let Err(e) = connection.insert(&mut *conn).await { + log_err!( + "Cannot insert connection {}:{} for container {}: {}", + domain, + port, + name, + e + ); + } else { + len += 1; + } + } + log!( + "Container {} and {} inner connections inserted into database", + name, + len + ); + + drop(conn); + context.update_ssl().await; + + Ok(()) + } + + async fn handle_container_stop( + &self, + context: &Context, + container: DockerContainer, + connections: Vec, + ) -> Result<()> { + log!("Container stopped: {} ({})", container.name, container.id); + + let name = container.name.clone(); + + let mut conn = context.db().await; + container.delete(&mut *conn).await?; + let mut len = 0; + for connection in connections { + if let Ok(()) = connection.delete(&mut *conn).await { + len += 1; + } + } + log!( + "Container {} and {} inner connections removed from database", + name, + len + ); + + Ok(()) + } + + fn parse_connections( + &self, + ip_address: &str, + lables: HashMap, + ) -> Vec { + let mut connections = Vec::new(); + + for (key, value) in lables { + if let Some(domain) = key.strip_prefix("rrpsfd:") { + if let Ok(port) = value.trim().parse::() { + connections.push(Connection { + domain: domain.to_owned(), + protocol: "http".to_owned(), + ip_address: ip_address.to_owned(), + port, + }); + } + } + } + + connections + } +} + +#[async_trait::async_trait] +pub trait DockerServiceInner: Send + Sync + 'static + Sized { + type Events: DockerServiceEvents; + + async fn new() -> Arc; + + fn events(&self) -> Self::Events; +} + +#[async_trait::async_trait] +pub trait DockerServiceEvents: Send + 'static +where + T: DockerServiceInner, +{ + async fn next(&mut self, service: Arc) -> Result; +} + +pub enum DockerServiceEvent { + ContainerStarted(DockerContainer, DockerContainerLables), + ContainerStopped(DockerContainer, DockerContainerLables), + Other, +} + +pub type DockerContainerLables = HashMap; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..96700c1 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,18 @@ +pub mod context; +pub mod db; +pub mod logger; +pub mod models; +pub mod settings; + +pub mod runner; +pub mod service; + +pub mod docker; +pub mod server; +pub mod tls; + +pub mod check; +pub mod test; + +mod prelude; +pub use prelude::run_services; diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..98d2f76 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,25 @@ +pub fn log(level: &str, message: String) { + let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); + println!("{} | {:^7} | {}", now, level, message); +} + +#[macro_export] +macro_rules! log { + ($($args:expr),*) => { + $crate::logger::log("INFO ", format!($($args),*)) + }; +} + +#[macro_export] +macro_rules! log_warn { + ($($args:expr),*) => { + $crate::logger::log("WARNING", format!($($args),*)) + }; +} + +#[macro_export] +macro_rules! log_err { + ($($args:expr),*) => { + $crate::logger::log("ERROR", format!($($args),*)) + }; +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..fc7f132 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + rrpsfd::run_services() +} diff --git a/src/models.rs b/src/models.rs new file mode 100644 index 0000000..637bbe0 --- /dev/null +++ b/src/models.rs @@ -0,0 +1,37 @@ +use ormlite::Model; + +#[derive(Clone, Model)] +pub struct Connection { + #[ormlite(primary_key)] + pub domain: String, + pub protocol: String, + pub ip_address: String, + pub port: u16, +} + +impl ToString for Connection { + fn to_string(&self) -> String { + format!( + "connection - {:>15} -> {:>5}://{}:{}", + self.domain, self.protocol, self.ip_address, self.port + ) + } +} + +#[derive(Clone, Model)] +pub struct DockerContainer { + pub id: String, + pub name: String, + pub ip_address: Option, +} + +impl ToString for DockerContainer { + fn to_string(&self) -> String { + format!( + "container#{} - ip: {:>15}, name: {}", + self.id, + self.ip_address.as_deref().unwrap_or("no ip address"), + self.name, + ) + } +} diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 0000000..931b33f --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,30 @@ +use std::env::args; + +use crate::runner::Runner as _; +use crate::service::Service as _; +use crate::settings::*; + +macro_rules! run_services { + ($($service:expr),*) => { + CurrentRunner::new().run(vec![ $($service),* ]) + }; +} + +pub fn run_services() { + match args().nth(1).as_deref() { + Some("--app") => { + run_services!( + Tls::new(), + Docker::new(), + Server::new(), + Server::new().https() + ) + } + Some("--test") => { + run_services!(tools::Test::new()) + } + _ => { + run_services!(tools::Check::new()) + } + } +} diff --git a/src/runner/mod.rs b/src/runner/mod.rs new file mode 100644 index 0000000..f5d464e --- /dev/null +++ b/src/runner/mod.rs @@ -0,0 +1,48 @@ +pub mod tokio; + +use std::{panic, sync::Arc}; + +use crate::{context::Context, log, service::Service}; + +pub trait Runner: Send + Sync + 'static { + fn new() -> Arc + where + Self: Sized; + fn run(self: Arc, services: Vec>) { + let _ = std::fs::create_dir_all("/home/data/acme-webroot"); + let orig_hook = panic::take_hook(); + + // Set a custom panic hook + panic::set_hook(Box::new(move |panic_info| { + orig_hook(panic_info); + std::process::exit(1); + })); + + let self_clone = self.clone(); + let body = async move { + let context = Context::new().await; + + let mut iter = services.iter(); + + let block_on = iter.next().expect("No services provided"); + + for service in iter { + log!("Starting service: {}", service.service_name()); + self_clone.spawn(service.clone().run(context.clone())); + } + + log!("Starting main thread service: {}", block_on.service_name()); + block_on.clone().run(context.clone()).await; + }; + + self.block_on(body) + } + fn spawn(&self, future: F) -> () + where + F: Future + Send + 'static, + F::Output: Send + 'static; + + fn block_on(&self, future: F) -> F::Output + where + F: std::future::Future; +} diff --git a/src/runner/tokio.rs b/src/runner/tokio.rs new file mode 100644 index 0000000..98f2cb0 --- /dev/null +++ b/src/runner/tokio.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use super::Runner; + +pub struct TokioRunner(tokio::runtime::Runtime); + +impl Runner for TokioRunner { + fn new() -> Arc { + Self( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed building the Runtime"), + ) + .into() + } + + fn block_on(&self, future: F) -> F::Output + where + F: Future, + { + self.0.block_on(future) + } + + fn spawn(&self, future: F) + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.0.spawn(future); + } +} diff --git a/src/server/actix_listener.rs b/src/server/actix_listener.rs new file mode 100644 index 0000000..37778d0 --- /dev/null +++ b/src/server/actix_listener.rs @@ -0,0 +1,297 @@ +use std::{pin::Pin, sync::Arc}; + +use actix_web::http::StatusCode; +use bytes::Bytes; +use futures_util::StreamExt; +use openssl::ssl::{NameType, SniError, SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; +use tokio::sync::mpsc; + +use crate::settings::Result; + +use super::{ + Context, HttpRequest, HttpResponse, ServerListenerService, ServerProxyService, + ServerSocketProxyService, Socket, SocketMessage, SocketReciever, SocketSender, +}; + +pub struct ActixListener; + +impl ActixListener {} + +#[async_trait::async_trait] +impl ServerListenerService for ActixListener { + async fn new() -> Arc { + Self.into() + } + async fn listen( + &self, + proxy: Arc, + bind_addr: &str, + https: bool, + context: Context, + ) -> Result<()> + where + T1: ServerProxyService, + T2: ServerSocketProxyService, + { + let node = ActixListenerNode::::new(proxy, context.clone()); + loop { + let node = node.clone(); + let app = actix_web::HttpServer::new(move || { + let app = actix_web::App::new().default_service(actix_web::web::to(node.clone())); + + if https { + app + } else { + app.service(acme_challenge) + } + }); + + let server = if https { + // app.bind_openssl(bind_addr, todo!())? + app.bind_openssl(bind_addr, build_ssl_acceptor(context.clone()))? + } else { + app.bind(bind_addr)? + } + .run(); + + server.await?; + } + } +} + +struct ActixListenerNode +where + T1: ServerProxyService, + T2: ServerSocketProxyService, +{ + proxy: Arc, + context: Context, + _marker: std::marker::PhantomData, +} + +impl Clone for ActixListenerNode +where + T1: ServerProxyService, + T2: ServerSocketProxyService, +{ + fn clone(&self) -> Self { + Self { + proxy: self.proxy.clone(), + context: self.context.clone(), + _marker: std::marker::PhantomData, + } + } +} + +impl ActixListenerNode +where + T1: ServerProxyService, + T2: ServerSocketProxyService, +{ + fn new(proxy: Arc, context: Context) -> Self { + Self { + proxy, + context, + _marker: std::marker::PhantomData, + } + .into() + } + async fn unified_service( + &self, + req: actix_web::HttpRequest, + payload: actix_web::web::Payload, + ) -> actix_web::HttpResponse { + let connection = self + .context + .get_connection(req.connection_info().host()) + .await; + + let res = if let Some(connection) = connection { + if actix_http::ws::handshake(req.head()).is_ok() { + let Ok((res, ses, stream)) = actix_ws::handle(&req, payload) else { + return HttpResponse::internal_server_error().into(); + }; + let Ok(backend) = T2::connect(req.into(), connection).await else { + let _ = ses.close(None).await; + return HttpResponse::internal_server_error().into(); + }; + actix_web::rt::spawn(async move { + backend + .proxy_loop(Socket { + sender: ActixSocketSender(ses), + reciever: ActixSocketReciever::from(stream), + }) + .await + }); + res + } else { + HttpResponse::from( + self.proxy + .fetch( + HttpRequest::from(req).set_body(payload.to_bytes().await.ok()), + connection, + ) + .await, + ) + .into() + } + } else { + HttpResponse::not_found().into() + }; + + res.into() + } +} + +impl actix_web::Handler<(actix_web::HttpRequest, actix_web::web::Payload)> + for ActixListenerNode +where + T1: ServerProxyService, + T2: ServerSocketProxyService, +{ + type Output = actix_web::HttpResponse; + + type Future = Pin>>; + + fn call(&self, args: (actix_web::HttpRequest, actix_web::web::Payload)) -> Self::Future { + let this = Arc::new(self.clone()); + Box::pin(async move { this.unified_service(args.0, args.1).await }) + } +} + +impl From for HttpRequest { + fn from(req: actix_web::HttpRequest) -> Self { + let method = req.method().clone().to_string(); + + let path = req + .uri() + .path_and_query() + .map(|x| x.to_string()) + .unwrap_or_else(|| req.uri().path().to_string()); + + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + Self { + method, + path, + headers, + body: Bytes::new(), + } + } +} + +impl Into for HttpResponse { + fn into(self) -> actix_web::HttpResponse { + let mut builder = actix_web::HttpResponse::build( + StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + ); + + for (key, value) in self.headers { + builder.insert_header((key, value)); + } + + builder.body(self.body) + } +} + +pub struct ActixSocketSender(actix_ws::Session); + +#[async_trait::async_trait] +impl SocketSender for ActixSocketSender { + async fn send(mut self, msg: SocketMessage) -> Result { + match msg { + SocketMessage::Text(text) => { + self.0.text(text).await?; + Ok(self) + } + SocketMessage::Binary(bin) => { + self.0.binary(bin).await?; + Ok(self) + } + SocketMessage::Ping(payload) => { + self.0.ping(&payload).await?; + Ok(self) + } + SocketMessage::Pong(payload) => { + self.0.pong(&payload).await?; + Ok(self) + } + SocketMessage::Close => { + self.0.close(None).await?; + Err("closed".into()) + } + SocketMessage::None => Ok(self), + } + } +} +pub struct ActixSocketReciever(mpsc::UnboundedReceiver); + +impl From for ActixSocketReciever { + fn from(mut stream: actix_ws::MessageStream) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + tokio::task::spawn_local(async move { + while let Some(item) = stream.next().await { + let msg = match item { + Ok(actix_ws::Message::Text(text)) => SocketMessage::Text(text.to_string()), + Ok(actix_ws::Message::Binary(bin)) => SocketMessage::Binary(bin), + Ok(actix_ws::Message::Ping(bin)) => SocketMessage::Ping(bin), + Ok(actix_ws::Message::Pong(bin)) => SocketMessage::Pong(bin), + Ok(actix_ws::Message::Close(_)) => SocketMessage::Close, + _ => SocketMessage::None, + }; + // ignore send error (receiver closed) + let _ = tx.send(msg); + } + }); + + Self(rx) + } +} + +#[async_trait::async_trait] +impl SocketReciever for ActixSocketReciever { + async fn next(&mut self) -> Option { + self.0.recv().await + } +} + +#[actix_web::get("/.well-known/acme-challenge/{token}")] +async fn acme_challenge(args: actix_web::web::Path<(String,)>) -> actix_web::HttpResponse { + let token = args.into_inner().0; + if let Ok(body) = tokio::fs::read(format!( + "./data/acme-webroot/.well-known/acme-challenge/{token}" + )) + .await + { + actix_web::HttpResponse::Ok() + .content_type("text/plain") + .body(body) + } else { + actix_web::HttpResponse::NotFound().finish() + } +} + +fn build_ssl_acceptor(context: Context) -> SslAcceptorBuilder { + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + + builder.set_servername_callback(move |ssl, _| { + if let Some(server_name) = ssl.servername(NameType::HOST_NAME) { + if let Some((key_path, cert_path)) = context.clone().get_cert(server_name) { + // replace certificate dynamically + ssl.set_private_key_file(key_path, SslFiletype::PEM) + .map_err(|_| SniError::ALERT_FATAL)?; + ssl.set_certificate_chain_file(cert_path) + .map_err(|_| SniError::ALERT_FATAL)?; + return Ok(()); + } + } + Err(SniError::ALERT_FATAL) + }); + + builder +} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..62e23ec --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,245 @@ +pub mod actix_listener; +pub mod reqwest_proxy; +pub mod tokio_tungstenite_socket_proxy; + +use std::sync::Arc; + +use bytes::Bytes; + +use crate::{context::Context, log_err, models::Connection, service::Service, settings::Result}; + +pub struct ServerService +where + T1: ServerListenerService, + T2: ServerProxyService, + T3: ServerSocketProxyService, +{ + https: bool, + _marker: std::marker::PhantomData<(T1, T2, T3)>, +} + +#[async_trait::async_trait] +impl Service for ServerService +where + T1: ServerListenerService, + T2: ServerProxyService, + T3: ServerSocketProxyService, +{ + fn new() -> Arc + where + Self: Sized, + { + Self { + https: false, + _marker: std::marker::PhantomData, + } + .into() + } + + async fn run_inner(self: Arc, context: Context) -> Result<()> { + let listener = T1::new().await; + let proxy = T2::new().await; + if self.https { + listener + .listen::(proxy, "0.0.0.0:443", true, context) + .await + } else { + listener + .listen::(proxy, "0.0.0.0:80", false, context) + .await + } + } + + fn service_name(&self) -> &'static str { + if self.https { + "Https server" + } else { + "Http server" + } + } +} + +impl ServerService +where + T1: ServerListenerService, + T2: ServerProxyService, + T3: ServerSocketProxyService, +{ + pub fn https(&self) -> Arc { + Self { + https: true, + _marker: std::marker::PhantomData, + } + .into() + } +} + +#[async_trait::async_trait] +pub trait ServerListenerService: Send + Sync + 'static + Sized { + async fn new() -> Arc; + + async fn listen( + &self, + proxy: Arc, + bind_addr: &str, + https: bool, + context: Context, + ) -> Result<()> + where + T1: ServerProxyService, + T2: ServerSocketProxyService; +} + +#[async_trait::async_trait] +pub trait ServerProxyService: Send + Sync + 'static + Sized { + async fn new() -> Arc; + + async fn fetch(&self, req: HttpRequest, connection: Connection) -> Result; +} + +pub struct HttpRequest { + pub method: String, + pub path: String, + pub headers: Vec<(String, String)>, + pub body: Bytes, +} + +impl HttpRequest { + pub fn new(method: String, path: String, headers: Vec<(String, String)>, body: Bytes) -> Self { + Self { + method, + path, + headers, + body, + } + } + + pub fn set_body(mut self, body: Option) -> Self { + if let Some(body) = body { + self.body = body; + } + self + } +} + +pub struct HttpResponse { + pub status: u16, + pub headers: Vec<(String, String)>, + pub body: Bytes, +} + +impl HttpResponse { + pub fn new(status: u16, headers: Vec<(String, String)>, body: Bytes) -> Self { + Self { + status, + headers, + body, + } + } + + pub fn internal_server_error() -> Self { + Self::new(500, vec![], Bytes::new()) + } + pub fn not_found() -> Self { + Self::new(404, vec![], Bytes::new()) + } +} + +impl From> for HttpResponse { + fn from(value: Result) -> Self { + match value { + Ok(response) => response, + Err(e) => { + log_err!("Server proxy error: {}", e); + HttpResponse::internal_server_error() + } + } + } +} + +pub struct Socket +where + T1: SocketSender, + T2: SocketReciever, +{ + pub sender: T1, + pub reciever: T2, +} + +#[async_trait::async_trait] +pub trait SocketSender: Send + 'static + Sized { + async fn send(mut self, msg: SocketMessage) -> Result; +} +#[async_trait::async_trait] +pub trait SocketReciever: Send + 'static + Sized { + async fn next(&mut self) -> Option; +} + +#[async_trait::async_trait] +pub trait ServerSocketProxyService: Sync + Send + 'static + Sized { + type Sender: SocketSender; + type Reciever: SocketReciever; + + async fn connect(req: HttpRequest, connection: Connection) -> Result; + fn get_socket(self) -> Socket; + + async fn proxy_loop(self, client: Socket) -> Result<()> + where + T1: SocketSender, + T2: SocketReciever, + { + let backend = self.get_socket(); + let mut reciever = client.reciever; + let mut sender = backend.sender; + let c2b = async move { + while let Some(msg) = reciever.next().await { + match msg { + SocketMessage::Close => { + let _ = sender.send(SocketMessage::Close).await; + break; + } + SocketMessage::None => {} + msg => match sender.send(msg).await { + Ok(s) => sender = s, + Err(e) => { + log_err!("WebSocket proxy send error: {}", e); + break; + } + }, + } + } + }; + let mut sender = client.sender; + let mut reciever = backend.reciever; + let b2c = async move { + while let Some(msg) = reciever.next().await { + match msg { + SocketMessage::Close => { + let _ = sender.send(SocketMessage::Close).await; + break; + } + SocketMessage::None => {} + msg => match sender.send(msg).await { + Ok(s) => sender = s, + Err(e) => { + log_err!("WebSocket proxy send error: {}", e); + break; + } + }, + } + } + }; + + futures_util::future::join(c2b, b2c).await; + Ok(()) + } +} + +pub enum SocketMessage { + Text(String), + Binary(Bytes), + Ping(Bytes), + Pong(Bytes), + Close, + None, +} diff --git a/src/server/reqwest_proxy.rs b/src/server/reqwest_proxy.rs new file mode 100644 index 0000000..a459dac --- /dev/null +++ b/src/server/reqwest_proxy.rs @@ -0,0 +1,56 @@ +use std::{str::FromStr, sync::Arc}; + +use bytes::Bytes; +use reqwest::{ + Method, + header::{HeaderMap, HeaderName, HeaderValue}, +}; + +use crate::settings::Result; + +use super::{Connection, HttpRequest, HttpResponse, ServerProxyService}; + +pub struct ReqwestProxy { + client: reqwest::Client, +} + +#[async_trait::async_trait] +impl ServerProxyService for ReqwestProxy { + async fn new() -> Arc { + Self { + client: reqwest::Client::new(), + } + .into() + } + async fn fetch(&self, req: HttpRequest, connection: Connection) -> Result { + let mut headers = HeaderMap::new(); + for (key, value) in req.headers { + headers.insert(key.parse::()?, value.parse::()?); + } + let res = self + .client + .request( + Method::from_str(&req.method)?, + format!( + "{}://{}:{}{}", + connection.protocol, connection.ip_address, connection.port, req.path + ), + ) + .headers(headers) + .body(req.body) + .send() + .await?; + + let headers = res + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + Ok(HttpResponse::new( + res.status().as_u16(), + headers, + res.bytes().await.ok().unwrap_or_else(|| Bytes::new()), + )) + } +} diff --git a/src/server/tokio_tungstenite_socket_proxy.rs b/src/server/tokio_tungstenite_socket_proxy.rs new file mode 100644 index 0000000..9494116 --- /dev/null +++ b/src/server/tokio_tungstenite_socket_proxy.rs @@ -0,0 +1,93 @@ +use futures_util::{ + SinkExt, StreamExt, + stream::{SplitSink, SplitStream}, +}; +use tokio::net::TcpStream; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; + +use crate::settings::Result; + +use super::{ + Connection, HttpRequest, ServerSocketProxyService, Socket, SocketMessage, SocketReciever, + SocketSender, +}; + +pub struct TokioTungsteniteSocketProxy( + Socket, +); + +#[async_trait::async_trait] +impl ServerSocketProxyService for TokioTungsteniteSocketProxy { + type Sender = TokioTungsteniteSocketSender; + type Reciever = TokioTungsteniteSocketReciever; + + async fn connect(req: HttpRequest, connection: Connection) -> Result { + let url = format!( + "{}://{}:{}{}", + connection.protocol.replace("http", "ws"), + connection.ip_address, + connection.port, + req.path + ); + let (ws_stream, _) = connect_async(url).await?; + + let (ses, stream) = ws_stream.split(); + + Ok(Self(Socket { + sender: TokioTungsteniteSocketSender(ses), + reciever: TokioTungsteniteSocketReciever(stream), + })) + } + fn get_socket(self) -> Socket { + self.0 + } +} + +pub struct TokioTungsteniteSocketSender( + SplitSink>, Message>, +); + +#[async_trait::async_trait] +impl SocketSender for TokioTungsteniteSocketSender { + async fn send(mut self, msg: SocketMessage) -> Result { + match msg { + SocketMessage::Text(text) => { + self.0.send(Message::Text(text.into())).await?; + Ok(self) + } + SocketMessage::Binary(bin) => { + self.0.send(Message::Binary(bin)).await?; + Ok(self) + } + SocketMessage::Ping(payload) => { + self.0.send(Message::Ping(payload)).await?; + Ok(self) + } + SocketMessage::Pong(payload) => { + self.0.send(Message::Pong(payload)).await?; + Ok(self) + } + SocketMessage::Close => { + self.0.close().await?; + Err("closed".into()) + } + SocketMessage::None => Ok(self), + } + } +} + +pub struct TokioTungsteniteSocketReciever(SplitStream>>); + +#[async_trait::async_trait] +impl SocketReciever for TokioTungsteniteSocketReciever { + async fn next(&mut self) -> Option { + self.0.next().await.map(|x| match x { + Ok(Message::Text(text)) => SocketMessage::Text(text.to_string()), + Ok(Message::Binary(bin)) => SocketMessage::Binary(bin), + Ok(Message::Ping(bin)) => SocketMessage::Ping(bin), + Ok(Message::Pong(bin)) => SocketMessage::Pong(bin), + Ok(Message::Close(_)) => SocketMessage::Close, + _ => SocketMessage::None, + }) + } +} diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..33a6c36 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use crate::{context::Context, log_err, log_warn, settings::Result}; + +#[async_trait::async_trait] +pub trait Service: Sync + Send + 'static { + fn new() -> Arc + where + Self: Sized; + + async fn run(self: Arc, context: Context) { + let mut code = 0; + if let Err(e) = self.clone().run_inner(context.clone()).await { + log_err!("{} error: {}", self.service_name(), e); + code = 1; + } + log_warn!("{} stopped: exiting...", self.service_name()); + std::process::exit(code); + } + + async fn run_inner(self: Arc, context: Context) -> Result<()>; + + fn service_name(&self) -> &'static str; +} diff --git a/src/settings.rs b/src/settings.rs new file mode 100644 index 0000000..9f5fc06 --- /dev/null +++ b/src/settings.rs @@ -0,0 +1,36 @@ +// Runner +pub use crate::runner::tokio::TokioRunner as CurrentRunner; +pub use tokio::{ + process::Command as AsyncCommand, + sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}, +}; + +// Docker +pub type Docker = crate::docker::DockerService; + +// Server +pub type Server = crate::server::ServerService< + crate::server::actix_listener::ActixListener, + crate::server::reqwest_proxy::ReqwestProxy, + crate::server::tokio_tungstenite_socket_proxy::TokioTungsteniteSocketProxy, +>; + +// Tls +pub type Tls = crate::tls::TlsService< + crate::tls::certbot_updater::CertbotUpdater, + crate::tls::ssl_expiration_checker::SslExpirationChecker, +>; + +// Check and Test tools +pub mod tools { + pub type Check = crate::check::CheckService; + pub type Test = crate::test::TestService; +} + +// Error +pub type Error = Box; +pub type Result = std::result::Result; + +pub async fn async_sleep(duration: std::time::Duration) { + tokio::time::sleep(duration).await; +} diff --git a/src/test/mod.rs b/src/test/mod.rs new file mode 100644 index 0000000..3d48950 --- /dev/null +++ b/src/test/mod.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use crate::{context::Context, log, service::Service, settings::Result}; + +pub struct TestService; + +#[async_trait::async_trait] +impl Service for TestService { + fn new() -> Arc + where + Self: Sized, + { + Self.into() + } + + async fn run_inner(self: Arc, _context: Context) -> Result<()> { + log!("No tests avaliable..."); + Ok(()) + } + + fn service_name(&self) -> &'static str { + "Test" + } +} diff --git a/src/tls/certbot_updater.rs b/src/tls/certbot_updater.rs new file mode 100644 index 0000000..3c92445 --- /dev/null +++ b/src/tls/certbot_updater.rs @@ -0,0 +1,53 @@ +use std::{fs, sync::Arc}; + +use crate::settings::{AsyncCommand, Result}; + +use super::TlsUpdaterService; + +pub struct CertbotUpdater; + +#[async_trait::async_trait] +impl TlsUpdaterService for CertbotUpdater { + fn new() -> Arc { + Self.into() + } + + async fn update_certificate(&self, email: &str, domain: &str) -> Result<(String, String)> { + if AsyncCommand::new("certbot") + .arg("certonly") + .arg("--webroot") + .arg("-w") + .arg("/home/data/acme-webroot") + .arg("--cert-name") + .arg(domain) + .arg("-d") + .arg(domain) + .arg("--email") + .arg(email) + .arg("--agree-tos") + .arg("--non-interactive") + .spawn()? + .wait() + .await? + .success() + { + self.get_certificate(domain).await + } else { + Err("Failed to update certificate".into()) + } + } + + async fn get_certificate(&self, domain: &str) -> Result<(String, String)> { + let key_path = format!("/etc/letsencrypt/live/{domain}/privkey.pem"); + if !fs::exists(&key_path).unwrap_or_default() { + return Err(format!("{key_path} not found").into()); + } + + let cert_path = format!("/etc/letsencrypt/live/{domain}/fullchain.pem"); + if !fs::exists(&cert_path).unwrap_or_default() { + return Err(format!("{cert_path} not found").into()); + } + + Ok((key_path, cert_path)) + } +} diff --git a/src/tls/mod.rs b/src/tls/mod.rs new file mode 100644 index 0000000..cc47345 --- /dev/null +++ b/src/tls/mod.rs @@ -0,0 +1,133 @@ +pub mod certbot_updater; +pub mod ssl_expiration_checker; + +use std::{env, sync::Arc, time::Duration}; + +use ormlite::Model; + +use crate::{ + context::{Context, Event}, + log_err, + models::Connection, + service::Service, + settings::{Result, async_sleep}, +}; + +#[derive(Clone)] +pub struct TlsService +where + T1: TlsUpdaterService, + T2: TlsCheckerService, +{ + updater: Arc, + checker: Arc, + email: String, +} + +#[async_trait::async_trait] +impl Service for TlsService +where + T1: TlsUpdaterService, + T2: TlsCheckerService, +{ + fn new() -> Arc + where + Self: Sized, + { + Self { + updater: T1::new(), + checker: T2::new(), + email: env::var("EMAIL") + .expect( + "For Tls service to work, you must specify the 'EMAIL' environment variable.", + ) + .trim() + .to_owned(), + } + .into() + } + + async fn run_inner(self: Arc, context: Context) -> Result<()> { + context.on_update_ssl(self.clone()).await; + let hour = Duration::from_secs(60 * 60); + + loop { + if let Err(err) = self.invoke(&context).await { + log_err!("Tls error: {}", err); + } + + async_sleep(hour).await; + } + } + + fn service_name(&self) -> &'static str { + "Tls" + } +} + +#[async_trait::async_trait] +impl Event for TlsService +where + T1: TlsUpdaterService, + T2: TlsCheckerService, +{ + async fn invoke(&self, context: &Context) -> Result<()> { + let mut conn = context.db().await; + let connections = Connection::select().fetch_all(&mut *conn).await?; + drop(conn); + + for domain in connections.iter().map(|x| x.domain.as_str()) { + if context.get_cert(domain).is_none() { + match if let Ok(cert) = self.updater.get_certificate(domain).await { + Ok(cert) + } else { + self.updater.update_certificate(&self.email, domain).await + } { + Ok(cert) => { + context.add_cert(domain.to_owned(), cert).await; + } + Err(e) => { + log_err!("Failed to get certificate for {}: {}", domain, e); + } + } + } else { + if self.checker.check(domain).await.is_err() { + match self.updater.update_certificate(&self.email, domain).await { + Ok(cert) => { + context.add_cert(domain.to_owned(), cert).await; + } + Err(e) => { + log_err!("Failed to update certificate for {}: {}", domain, e); + } + } + } + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +pub trait TlsUpdaterService: Send + Sync + 'static + Sized { + fn new() -> Arc; + + async fn update_certificate(&self, email: &str, domain: &str) -> Result<(String, String)>; + + async fn get_certificate(&self, domain: &str) -> Result<(String, String)>; +} + +pub struct NeedsRenewal; + +impl From for NeedsRenewal { + fn from(_: T) -> Self { + NeedsRenewal + } +} + +#[async_trait::async_trait] +pub trait TlsCheckerService: Send + Sync + 'static + Sized { + fn new() -> Arc; + + async fn check(&self, domain: &str) -> std::result::Result<(), NeedsRenewal>; +} diff --git a/src/tls/ssl_expiration_checker.rs b/src/tls/ssl_expiration_checker.rs new file mode 100644 index 0000000..2a61d0c --- /dev/null +++ b/src/tls/ssl_expiration_checker.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use super::{NeedsRenewal, TlsCheckerService}; + +pub struct SslExpirationChecker; + +#[async_trait::async_trait] +impl TlsCheckerService for SslExpirationChecker { + fn new() -> Arc { + Self.into() + } + async fn check(&self, domain: &str) -> Result<(), NeedsRenewal> { + let expires = ssl_expiration2::SslExpiration::from_domain_name(domain)?; + if expires.days() < 7 { + Err(NeedsRenewal) + } else { + Ok(()) + } + } +}