Release 0.1.0

This commit is contained in:
Fasterino
2025-10-31 04:27:42 +03:00
commit 6fe2f12b8e
30 changed files with 2031 additions and 0 deletions

204
src/check/mod.rs Normal file
View File

@@ -0,0 +1,204 @@
use std::{
fmt::Display,
io::{IsTerminal, Stdin, stdin},
sync::Arc,
};
use ormlite::Model;
use sqlx::SqliteConnection;
use crate::{
context::Context,
log, log_err, log_warn,
models::{Connection, DockerContainer},
service::Service,
settings::{AsyncMutexGuard, Result},
};
pub struct CheckService;
#[async_trait::async_trait]
impl Service for CheckService {
fn new() -> Arc<Self>
where
Self: Sized,
{
Self.into()
}
async fn run_inner(self: Arc<Self>, context: Context) -> Result<()> {
let stdio = stdin();
let is_terminal = stdio.is_terminal();
loop {
let mut conn = context.db().await;
log!("Loading database...");
log!("Getting docker containers...");
let containers = DockerContainer::select().fetch_all(&mut *conn).await?;
let containers_str = containers
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join("\n");
log!(
"Found {} containers\n{}\n",
containers.len(),
containers_str
);
log!("Getting connections...");
let connections = Connection::select().fetch_all(&mut *conn).await?;
let connections_str = connections
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join("\n");
log!(
"Found {} connections\n{}\n",
connections.len(),
connections_str
);
if !is_terminal {
log!(
"If you want to add or remove connections run command:\ndocker exec -it <container_name> rrpsfd"
);
break;
}
match CheckCommand::poll(&stdio, conn).await {
Ok(true) => break,
Ok(false) => continue,
Err(e) => {
log_err!("{e}\n\n")
}
}
}
Ok(())
}
fn service_name(&self) -> &'static str {
"Test"
}
}
enum CheckCommand {
AddConnection,
RemoveConnection,
Exit,
}
impl Display for CheckCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CheckCommand::AddConnection => {
f.write_str("add <domain> <protocol>://<host>:<port> - Add conection with specified domain and link")
}
CheckCommand::RemoveConnection => f.write_str("remove <domain> - Remove conection with specified domain"),
CheckCommand::Exit => f.write_str("exit - Exit from service mode>"),
}
}
}
impl CheckCommand {
pub async fn poll(stdio: &Stdin, conn: AsyncMutexGuard<'_, SqliteConnection>) -> Result<bool> {
log!(
"Enter one of this command:\n{}\n{}\n{}\n",
Self::AddConnection,
Self::RemoveConnection,
Self::Exit
);
let mut input = String::new();
stdio.read_line(&mut input)?;
let mut parts = input.trim().split(' ');
let command =
Self::get_command(parts.next().ok_or("No input!")?).ok_or("Wrong command!")?;
match command {
CheckCommand::AddConnection => {
let domain = parts.next().ok_or("No domain specified!")?;
let link = parts.next().ok_or("No link specified!")?;
let (protocol, parts) = link.split_once("://").ok_or("Wrong link format!")?;
if protocol != "http" && protocol != "https" {
return Err("Supported protocols: http, https".into());
}
let (host, port) = parts.split_once(":").ok_or("Wrong link format!")?;
let port: u16 = port.parse().map_err(|_| "Wrong link format!")?;
Self::add_connection(
conn,
domain.to_owned(),
protocol.to_owned(),
host.to_owned(),
port,
)
.await?;
log!(
"Ok! The SSL certificate will be ready in an hour. If you don't want to wait, restart the container.\n\n"
);
Ok(false)
}
CheckCommand::RemoveConnection => {
let domain = parts.next().ok_or("No domain specified!")?;
Self::remove_connection(stdio, conn, domain.to_owned()).await?;
log!("Ok!\n\n");
Ok(false)
}
CheckCommand::Exit => Ok(true),
}
}
fn get_command(command: &str) -> Option<Self> {
match command {
"add" => Some(Self::AddConnection),
"remove" => Some(Self::RemoveConnection),
"exit" => Some(Self::Exit),
_ => None,
}
}
async fn add_connection(
mut conn: AsyncMutexGuard<'_, SqliteConnection>,
domain: String,
protocol: String,
host: String,
port: u16,
) -> Result<()> {
Connection {
domain,
protocol,
ip_address: host,
port,
}
.insert(&mut *conn)
.await?;
Ok(())
}
async fn remove_connection(
stdio: &Stdin,
mut conn: AsyncMutexGuard<'_, SqliteConnection>,
domain: String,
) -> Result<()> {
let connection = Connection::fetch_one(domain, &mut *conn).await?;
if let Ok(container) = DockerContainer::select()
.where_("ip_address = ?")
.bind(&connection.ip_address)
.fetch_one(&mut *conn)
.await
{
log_warn!(
"This connection is part of container with name {}, type 'confirm' if you want to delete this connection:",
container.name
);
let mut confirm = String::new();
stdio.read_line(&mut confirm)?;
if !confirm.trim().eq_ignore_ascii_case("confirm") {
return Err("Deletion was rejected".into());
}
}
connection.delete(&mut *conn).await?;
Ok(())
}
}

63
src/context.rs Normal file
View File

@@ -0,0 +1,63 @@
use std::{collections::HashMap, sync::Arc};
use ormlite::{Model, sqlite::SqliteConnection};
use crate::{
db::Db,
log_err,
models::Connection,
settings::{AsyncMutex, AsyncMutexGuard, Result},
};
#[async_trait::async_trait]
pub trait Event: Send + Sync {
async fn invoke(&self, context: &Context) -> Result<()>;
}
#[derive(Clone)]
pub struct Context {
db: Arc<AsyncMutex<SqliteConnection>>,
certificates: Arc<AsyncMutex<HashMap<String, (String, String)>>>,
on_update_ssl: Arc<AsyncMutex<Option<Arc<dyn Event>>>>,
}
impl Context {
pub async fn new() -> Self {
Context {
db: Db::connect().await,
certificates: AsyncMutex::new(HashMap::new()).into(),
on_update_ssl: AsyncMutex::new(None).into(),
}
}
pub async fn db(&'_ self) -> AsyncMutexGuard<'_, SqliteConnection> {
self.db.lock().await
}
pub async fn get_connection(&'_ self, domain: &str) -> Option<Connection> {
let mut conn = self.db().await;
Connection::fetch_one(domain, &mut *conn).await.ok()
}
pub async fn on_update_ssl(&self, event: Arc<dyn Event>) {
let mut on_update_ssl = self.on_update_ssl.lock().await;
*on_update_ssl = Some(event.clone());
}
pub async fn update_ssl(&self) {
let on_update_ssl = self.on_update_ssl.lock().await;
if let Some(event) = on_update_ssl.clone() {
if let Err(err) = event.invoke(self).await {
log_err!("Failed to update SSL: {}", err);
}
}
}
pub async fn add_cert(&self, domain: String, cert: (String, String)) {
let mut certificates = self.certificates.lock().await;
certificates.insert(domain, cert);
}
pub fn get_cert(&self, domain: &str) -> Option<(String, String)> {
let certificates = self.certificates.try_lock().ok()?;
certificates.get(domain).cloned()
}
}

32
src/db.rs Normal file
View File

@@ -0,0 +1,32 @@
use std::{fs, sync::Arc};
use ormlite::{
Connection,
sqlite::{SqliteConnectOptions, SqliteConnection},
};
use crate::settings::AsyncMutex;
pub struct Db;
const DB_PATH: &str = "./data/rs.db";
impl Db {
pub async fn connect() -> Arc<AsyncMutex<SqliteConnection>> {
let options = SqliteConnectOptions::new()
.filename(DB_PATH)
.create_if_missing(true);
let mut db = SqliteConnection::connect_with(&options)
.await
.expect("Cannot connect to db");
let sql = fs::read_to_string("./schema.sql").expect("Cannot read schema file");
sqlx::query(&sql)
.execute(&mut db)
.await
.expect("Cannot initialize database");
AsyncMutex::new(db).into()
}
}

133
src/docker/bollard.rs Normal file
View File

@@ -0,0 +1,133 @@
use std::{pin::Pin, sync::Arc};
use bollard::{
Docker,
errors::Error,
query_parameters::{EventsOptionsBuilder, InspectContainerOptions},
secret::{ContainerInspectResponse, EventMessage},
};
use futures_util::{Stream, StreamExt};
use crate::settings::Result;
use super::{
DockerContainer, DockerContainerLables, DockerServiceEvent, DockerServiceEvents,
DockerServiceInner,
};
pub struct BollardDocker {
docker: Docker,
}
#[async_trait::async_trait]
impl<'a> DockerServiceInner for BollardDocker {
type Events = BollardDockerEvents;
async fn new() -> Arc<Self>
where
Self: Sized,
{
Self {
docker: Docker::connect_with_local_defaults().expect("Failed to connect to Docker"),
}
.into()
}
fn events(&self) -> Self::Events {
Self::Events::new(Box::pin(
self.docker
.events(Some(EventsOptionsBuilder::new().build())),
))
}
}
impl BollardDocker {
pub async fn parse(&self, event: EventMessage) -> DockerServiceEvent {
if event.typ != Some(bollard::secret::EventMessageTypeEnum::CONTAINER) {
return DockerServiceEvent::Other;
}
let Some(action) = event.action.as_deref() else {
return DockerServiceEvent::Other;
};
let Some(id) = event.actor.and_then(|x| x.id) else {
return DockerServiceEvent::Other;
};
let Some((container, lables)) = self
.get_container(&id)
.await
.ok()
.and_then(|x| self.parse_container_info(x))
else {
return DockerServiceEvent::Other;
};
match action {
"start" => DockerServiceEvent::ContainerStarted(container, lables),
"stop" => DockerServiceEvent::ContainerStopped(container, lables),
_ => DockerServiceEvent::Other,
}
}
async fn get_container(&self, container_id: &str) -> Result<ContainerInspectResponse> {
Ok(self
.docker
.inspect_container(container_id, None::<InspectContainerOptions>)
.await?)
}
fn parse_container_info(
&self,
container: ContainerInspectResponse,
) -> Option<(DockerContainer, DockerContainerLables)> {
let id = container.id?;
let name = container.name?.trim_start_matches('/').to_owned();
let lables = container.config?.labels?;
let ip_address = container
.network_settings?
.networks?
.values()
.next()?
.ip_address
.clone()
.and_then(|x| if x.is_empty() { None } else { Some(x) });
Some((
DockerContainer {
id,
name,
ip_address,
},
lables,
))
}
}
pub struct BollardDockerEvents {
stream: Pin<Box<dyn Stream<Item = std::result::Result<EventMessage, Error>> + Send>>,
}
impl BollardDockerEvents {
pub fn new(
stream: Pin<Box<dyn Stream<Item = std::result::Result<EventMessage, Error>> + Send>>,
) -> Self {
Self { stream }
}
async fn next_inner(&mut self, service: Arc<BollardDocker>) -> Result<DockerServiceEvent> {
match self.stream.next().await {
Some(Ok(event)) => Ok(service.parse(event).await),
Some(Err(e)) => Err(e.into()),
None => Err("No more events".into()),
}
}
}
#[async_trait::async_trait]
impl DockerServiceEvents<BollardDocker> for BollardDockerEvents {
async fn next(&mut self, service: Arc<BollardDocker>) -> Result<DockerServiceEvent> {
self.next_inner(service).await
}
}

204
src/docker/mod.rs Normal file
View File

@@ -0,0 +1,204 @@
pub mod bollard;
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use ormlite::Model as _;
use crate::{
context::Context,
log, log_err,
models::{Connection, DockerContainer},
service::Service,
settings::Result,
};
pub struct DockerService<T>(PhantomData<T>)
where
T: DockerServiceInner;
#[async_trait::async_trait]
impl<T> Service for DockerService<T>
where
T: DockerServiceInner,
{
fn new() -> Arc<Self> {
Self(PhantomData).into()
}
async fn run_inner(self: Arc<Self>, context: Context) -> Result<()> {
let inner = T::new().await;
let mut events = inner.events();
loop {
let event = events.next(inner.clone()).await;
match event {
Ok(event) => self.handle_event(&context, event).await,
Err(err) => {
return Err(err);
}
}
}
}
fn service_name(&self) -> &'static str {
"Docker"
}
}
impl<T> DockerService<T>
where
T: DockerServiceInner,
{
async fn handle_event(&self, context: &Context, event: DockerServiceEvent) {
if let Err(e) = match event {
DockerServiceEvent::ContainerStarted(container, lables) => {
if let Some(ip) = container.ip_address.as_deref() {
let connections = self.parse_connections(ip, lables);
if connections.is_empty() {
return;
}
self.handle_container_start(context, container, connections)
.await
} else {
Err("Container has no IP address".into())
}
}
DockerServiceEvent::ContainerStopped(container, lables) => {
let connections = self.parse_connections("", lables);
if connections.is_empty() {
return;
}
self.handle_container_stop(context, container, connections)
.await
}
_ => Ok(()),
} {
log_err!("Docker event handling failed: {}", e);
}
}
async fn handle_container_start(
&self,
context: &Context,
container: DockerContainer,
connections: Vec<Connection>,
) -> Result<()> {
let Some(ip) = container.ip_address.as_deref() else {
return Err("Container has no IP address".into());
};
log!(
"Container started: {} ({}), ip: {}",
container.name,
container.id,
ip
);
let name = container.name.clone();
let mut conn = context.db().await;
container.insert(&mut *conn).await?;
let mut len = 0;
for connection in connections {
let domain = connection.domain.clone();
let port = connection.port;
if let Err(e) = connection.insert(&mut *conn).await {
log_err!(
"Cannot insert connection {}:{} for container {}: {}",
domain,
port,
name,
e
);
} else {
len += 1;
}
}
log!(
"Container {} and {} inner connections inserted into database",
name,
len
);
drop(conn);
context.update_ssl().await;
Ok(())
}
async fn handle_container_stop(
&self,
context: &Context,
container: DockerContainer,
connections: Vec<Connection>,
) -> Result<()> {
log!("Container stopped: {} ({})", container.name, container.id);
let name = container.name.clone();
let mut conn = context.db().await;
container.delete(&mut *conn).await?;
let mut len = 0;
for connection in connections {
if let Ok(()) = connection.delete(&mut *conn).await {
len += 1;
}
}
log!(
"Container {} and {} inner connections removed from database",
name,
len
);
Ok(())
}
fn parse_connections(
&self,
ip_address: &str,
lables: HashMap<String, String>,
) -> Vec<Connection> {
let mut connections = Vec::new();
for (key, value) in lables {
if let Some(domain) = key.strip_prefix("rrpsfd:") {
if let Ok(port) = value.trim().parse::<u16>() {
connections.push(Connection {
domain: domain.to_owned(),
protocol: "http".to_owned(),
ip_address: ip_address.to_owned(),
port,
});
}
}
}
connections
}
}
#[async_trait::async_trait]
pub trait DockerServiceInner: Send + Sync + 'static + Sized {
type Events: DockerServiceEvents<Self>;
async fn new() -> Arc<Self>;
fn events(&self) -> Self::Events;
}
#[async_trait::async_trait]
pub trait DockerServiceEvents<T>: Send + 'static
where
T: DockerServiceInner,
{
async fn next(&mut self, service: Arc<T>) -> Result<DockerServiceEvent>;
}
pub enum DockerServiceEvent {
ContainerStarted(DockerContainer, DockerContainerLables),
ContainerStopped(DockerContainer, DockerContainerLables),
Other,
}
pub type DockerContainerLables = HashMap<String, String>;

18
src/lib.rs Normal file
View File

@@ -0,0 +1,18 @@
pub mod context;
pub mod db;
pub mod logger;
pub mod models;
pub mod settings;
pub mod runner;
pub mod service;
pub mod docker;
pub mod server;
pub mod tls;
pub mod check;
pub mod test;
mod prelude;
pub use prelude::run_services;

25
src/logger.rs Normal file
View File

@@ -0,0 +1,25 @@
pub fn log(level: &str, message: String) {
let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
println!("{} | {:^7} | {}", now, level, message);
}
#[macro_export]
macro_rules! log {
($($args:expr),*) => {
$crate::logger::log("INFO ", format!($($args),*))
};
}
#[macro_export]
macro_rules! log_warn {
($($args:expr),*) => {
$crate::logger::log("WARNING", format!($($args),*))
};
}
#[macro_export]
macro_rules! log_err {
($($args:expr),*) => {
$crate::logger::log("ERROR", format!($($args),*))
};
}

3
src/main.rs Normal file
View File

@@ -0,0 +1,3 @@
fn main() {
rrpsfd::run_services()
}

37
src/models.rs Normal file
View File

@@ -0,0 +1,37 @@
use ormlite::Model;
#[derive(Clone, Model)]
pub struct Connection {
#[ormlite(primary_key)]
pub domain: String,
pub protocol: String,
pub ip_address: String,
pub port: u16,
}
impl ToString for Connection {
fn to_string(&self) -> String {
format!(
"connection - {:>15} -> {:>5}://{}:{}",
self.domain, self.protocol, self.ip_address, self.port
)
}
}
#[derive(Clone, Model)]
pub struct DockerContainer {
pub id: String,
pub name: String,
pub ip_address: Option<String>,
}
impl ToString for DockerContainer {
fn to_string(&self) -> String {
format!(
"container#{} - ip: {:>15}, name: {}",
self.id,
self.ip_address.as_deref().unwrap_or("no ip address"),
self.name,
)
}
}

30
src/prelude.rs Normal file
View File

@@ -0,0 +1,30 @@
use std::env::args;
use crate::runner::Runner as _;
use crate::service::Service as _;
use crate::settings::*;
macro_rules! run_services {
($($service:expr),*) => {
CurrentRunner::new().run(vec![ $($service),* ])
};
}
pub fn run_services() {
match args().nth(1).as_deref() {
Some("--app") => {
run_services!(
Tls::new(),
Docker::new(),
Server::new(),
Server::new().https()
)
}
Some("--test") => {
run_services!(tools::Test::new())
}
_ => {
run_services!(tools::Check::new())
}
}
}

48
src/runner/mod.rs Normal file
View File

@@ -0,0 +1,48 @@
pub mod tokio;
use std::{panic, sync::Arc};
use crate::{context::Context, log, service::Service};
pub trait Runner: Send + Sync + 'static {
fn new() -> Arc<Self>
where
Self: Sized;
fn run(self: Arc<Self>, services: Vec<Arc<dyn Service>>) {
let _ = std::fs::create_dir_all("/home/data/acme-webroot");
let orig_hook = panic::take_hook();
// Set a custom panic hook
panic::set_hook(Box::new(move |panic_info| {
orig_hook(panic_info);
std::process::exit(1);
}));
let self_clone = self.clone();
let body = async move {
let context = Context::new().await;
let mut iter = services.iter();
let block_on = iter.next().expect("No services provided");
for service in iter {
log!("Starting service: {}", service.service_name());
self_clone.spawn(service.clone().run(context.clone()));
}
log!("Starting main thread service: {}", block_on.service_name());
block_on.clone().run(context.clone()).await;
};
self.block_on(body)
}
fn spawn<F>(&self, future: F) -> ()
where
F: Future + Send + 'static,
F::Output: Send + 'static;
fn block_on<F>(&self, future: F) -> F::Output
where
F: std::future::Future;
}

32
src/runner/tokio.rs Normal file
View File

@@ -0,0 +1,32 @@
use std::sync::Arc;
use super::Runner;
pub struct TokioRunner(tokio::runtime::Runtime);
impl Runner for TokioRunner {
fn new() -> Arc<Self> {
Self(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime"),
)
.into()
}
fn block_on<F>(&self, future: F) -> F::Output
where
F: Future,
{
self.0.block_on(future)
}
fn spawn<F>(&self, future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.0.spawn(future);
}
}

View File

@@ -0,0 +1,297 @@
use std::{pin::Pin, sync::Arc};
use actix_web::http::StatusCode;
use bytes::Bytes;
use futures_util::StreamExt;
use openssl::ssl::{NameType, SniError, SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};
use tokio::sync::mpsc;
use crate::settings::Result;
use super::{
Context, HttpRequest, HttpResponse, ServerListenerService, ServerProxyService,
ServerSocketProxyService, Socket, SocketMessage, SocketReciever, SocketSender,
};
pub struct ActixListener;
impl ActixListener {}
#[async_trait::async_trait]
impl ServerListenerService for ActixListener {
async fn new() -> Arc<Self> {
Self.into()
}
async fn listen<T1, T2>(
&self,
proxy: Arc<T1>,
bind_addr: &str,
https: bool,
context: Context,
) -> Result<()>
where
T1: ServerProxyService,
T2: ServerSocketProxyService,
{
let node = ActixListenerNode::<T1, T2>::new(proxy, context.clone());
loop {
let node = node.clone();
let app = actix_web::HttpServer::new(move || {
let app = actix_web::App::new().default_service(actix_web::web::to(node.clone()));
if https {
app
} else {
app.service(acme_challenge)
}
});
let server = if https {
// app.bind_openssl(bind_addr, todo!())?
app.bind_openssl(bind_addr, build_ssl_acceptor(context.clone()))?
} else {
app.bind(bind_addr)?
}
.run();
server.await?;
}
}
}
struct ActixListenerNode<T1, T2>
where
T1: ServerProxyService,
T2: ServerSocketProxyService,
{
proxy: Arc<T1>,
context: Context,
_marker: std::marker::PhantomData<T2>,
}
impl<T1, T2> Clone for ActixListenerNode<T1, T2>
where
T1: ServerProxyService,
T2: ServerSocketProxyService,
{
fn clone(&self) -> Self {
Self {
proxy: self.proxy.clone(),
context: self.context.clone(),
_marker: std::marker::PhantomData,
}
}
}
impl<T1, T2> ActixListenerNode<T1, T2>
where
T1: ServerProxyService,
T2: ServerSocketProxyService,
{
fn new(proxy: Arc<T1>, context: Context) -> Self {
Self {
proxy,
context,
_marker: std::marker::PhantomData,
}
.into()
}
async fn unified_service(
&self,
req: actix_web::HttpRequest,
payload: actix_web::web::Payload,
) -> actix_web::HttpResponse {
let connection = self
.context
.get_connection(req.connection_info().host())
.await;
let res = if let Some(connection) = connection {
if actix_http::ws::handshake(req.head()).is_ok() {
let Ok((res, ses, stream)) = actix_ws::handle(&req, payload) else {
return HttpResponse::internal_server_error().into();
};
let Ok(backend) = T2::connect(req.into(), connection).await else {
let _ = ses.close(None).await;
return HttpResponse::internal_server_error().into();
};
actix_web::rt::spawn(async move {
backend
.proxy_loop(Socket {
sender: ActixSocketSender(ses),
reciever: ActixSocketReciever::from(stream),
})
.await
});
res
} else {
HttpResponse::from(
self.proxy
.fetch(
HttpRequest::from(req).set_body(payload.to_bytes().await.ok()),
connection,
)
.await,
)
.into()
}
} else {
HttpResponse::not_found().into()
};
res.into()
}
}
impl<T1, T2> actix_web::Handler<(actix_web::HttpRequest, actix_web::web::Payload)>
for ActixListenerNode<T1, T2>
where
T1: ServerProxyService,
T2: ServerSocketProxyService,
{
type Output = actix_web::HttpResponse;
type Future = Pin<Box<dyn Future<Output = actix_web::HttpResponse>>>;
fn call(&self, args: (actix_web::HttpRequest, actix_web::web::Payload)) -> Self::Future {
let this = Arc::new(self.clone());
Box::pin(async move { this.unified_service(args.0, args.1).await })
}
}
impl From<actix_web::HttpRequest> for HttpRequest {
fn from(req: actix_web::HttpRequest) -> Self {
let method = req.method().clone().to_string();
let path = req
.uri()
.path_and_query()
.map(|x| x.to_string())
.unwrap_or_else(|| req.uri().path().to_string());
let headers: Vec<(String, String)> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
Self {
method,
path,
headers,
body: Bytes::new(),
}
}
}
impl Into<actix_web::HttpResponse> for HttpResponse {
fn into(self) -> actix_web::HttpResponse {
let mut builder = actix_web::HttpResponse::build(
StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
);
for (key, value) in self.headers {
builder.insert_header((key, value));
}
builder.body(self.body)
}
}
pub struct ActixSocketSender(actix_ws::Session);
#[async_trait::async_trait]
impl SocketSender for ActixSocketSender {
async fn send(mut self, msg: SocketMessage) -> Result<Self> {
match msg {
SocketMessage::Text(text) => {
self.0.text(text).await?;
Ok(self)
}
SocketMessage::Binary(bin) => {
self.0.binary(bin).await?;
Ok(self)
}
SocketMessage::Ping(payload) => {
self.0.ping(&payload).await?;
Ok(self)
}
SocketMessage::Pong(payload) => {
self.0.pong(&payload).await?;
Ok(self)
}
SocketMessage::Close => {
self.0.close(None).await?;
Err("closed".into())
}
SocketMessage::None => Ok(self),
}
}
}
pub struct ActixSocketReciever(mpsc::UnboundedReceiver<SocketMessage>);
impl From<actix_ws::MessageStream> for ActixSocketReciever {
fn from(mut stream: actix_ws::MessageStream) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
tokio::task::spawn_local(async move {
while let Some(item) = stream.next().await {
let msg = match item {
Ok(actix_ws::Message::Text(text)) => SocketMessage::Text(text.to_string()),
Ok(actix_ws::Message::Binary(bin)) => SocketMessage::Binary(bin),
Ok(actix_ws::Message::Ping(bin)) => SocketMessage::Ping(bin),
Ok(actix_ws::Message::Pong(bin)) => SocketMessage::Pong(bin),
Ok(actix_ws::Message::Close(_)) => SocketMessage::Close,
_ => SocketMessage::None,
};
// ignore send error (receiver closed)
let _ = tx.send(msg);
}
});
Self(rx)
}
}
#[async_trait::async_trait]
impl SocketReciever for ActixSocketReciever {
async fn next(&mut self) -> Option<SocketMessage> {
self.0.recv().await
}
}
#[actix_web::get("/.well-known/acme-challenge/{token}")]
async fn acme_challenge(args: actix_web::web::Path<(String,)>) -> actix_web::HttpResponse {
let token = args.into_inner().0;
if let Ok(body) = tokio::fs::read(format!(
"./data/acme-webroot/.well-known/acme-challenge/{token}"
))
.await
{
actix_web::HttpResponse::Ok()
.content_type("text/plain")
.body(body)
} else {
actix_web::HttpResponse::NotFound().finish()
}
}
fn build_ssl_acceptor(context: Context) -> SslAcceptorBuilder {
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder.set_servername_callback(move |ssl, _| {
if let Some(server_name) = ssl.servername(NameType::HOST_NAME) {
if let Some((key_path, cert_path)) = context.clone().get_cert(server_name) {
// replace certificate dynamically
ssl.set_private_key_file(key_path, SslFiletype::PEM)
.map_err(|_| SniError::ALERT_FATAL)?;
ssl.set_certificate_chain_file(cert_path)
.map_err(|_| SniError::ALERT_FATAL)?;
return Ok(());
}
}
Err(SniError::ALERT_FATAL)
});
builder
}

245
src/server/mod.rs Normal file
View File

@@ -0,0 +1,245 @@
pub mod actix_listener;
pub mod reqwest_proxy;
pub mod tokio_tungstenite_socket_proxy;
use std::sync::Arc;
use bytes::Bytes;
use crate::{context::Context, log_err, models::Connection, service::Service, settings::Result};
pub struct ServerService<T1, T2, T3>
where
T1: ServerListenerService,
T2: ServerProxyService,
T3: ServerSocketProxyService,
{
https: bool,
_marker: std::marker::PhantomData<(T1, T2, T3)>,
}
#[async_trait::async_trait]
impl<T1, T2, T3> Service for ServerService<T1, T2, T3>
where
T1: ServerListenerService,
T2: ServerProxyService,
T3: ServerSocketProxyService,
{
fn new() -> Arc<Self>
where
Self: Sized,
{
Self {
https: false,
_marker: std::marker::PhantomData,
}
.into()
}
async fn run_inner(self: Arc<Self>, context: Context) -> Result<()> {
let listener = T1::new().await;
let proxy = T2::new().await;
if self.https {
listener
.listen::<T2, T3>(proxy, "0.0.0.0:443", true, context)
.await
} else {
listener
.listen::<T2, T3>(proxy, "0.0.0.0:80", false, context)
.await
}
}
fn service_name(&self) -> &'static str {
if self.https {
"Https server"
} else {
"Http server"
}
}
}
impl<T1, T2, T3> ServerService<T1, T2, T3>
where
T1: ServerListenerService,
T2: ServerProxyService,
T3: ServerSocketProxyService,
{
pub fn https(&self) -> Arc<Self> {
Self {
https: true,
_marker: std::marker::PhantomData,
}
.into()
}
}
#[async_trait::async_trait]
pub trait ServerListenerService: Send + Sync + 'static + Sized {
async fn new() -> Arc<Self>;
async fn listen<T1, T2>(
&self,
proxy: Arc<T1>,
bind_addr: &str,
https: bool,
context: Context,
) -> Result<()>
where
T1: ServerProxyService,
T2: ServerSocketProxyService;
}
#[async_trait::async_trait]
pub trait ServerProxyService: Send + Sync + 'static + Sized {
async fn new() -> Arc<Self>;
async fn fetch(&self, req: HttpRequest, connection: Connection) -> Result<HttpResponse>;
}
pub struct HttpRequest {
pub method: String,
pub path: String,
pub headers: Vec<(String, String)>,
pub body: Bytes,
}
impl HttpRequest {
pub fn new(method: String, path: String, headers: Vec<(String, String)>, body: Bytes) -> Self {
Self {
method,
path,
headers,
body,
}
}
pub fn set_body(mut self, body: Option<Bytes>) -> Self {
if let Some(body) = body {
self.body = body;
}
self
}
}
pub struct HttpResponse {
pub status: u16,
pub headers: Vec<(String, String)>,
pub body: Bytes,
}
impl HttpResponse {
pub fn new(status: u16, headers: Vec<(String, String)>, body: Bytes) -> Self {
Self {
status,
headers,
body,
}
}
pub fn internal_server_error() -> Self {
Self::new(500, vec![], Bytes::new())
}
pub fn not_found() -> Self {
Self::new(404, vec![], Bytes::new())
}
}
impl From<Result<HttpResponse>> for HttpResponse {
fn from(value: Result<HttpResponse>) -> Self {
match value {
Ok(response) => response,
Err(e) => {
log_err!("Server proxy error: {}", e);
HttpResponse::internal_server_error()
}
}
}
}
pub struct Socket<T1, T2>
where
T1: SocketSender,
T2: SocketReciever,
{
pub sender: T1,
pub reciever: T2,
}
#[async_trait::async_trait]
pub trait SocketSender: Send + 'static + Sized {
async fn send(mut self, msg: SocketMessage) -> Result<Self>;
}
#[async_trait::async_trait]
pub trait SocketReciever: Send + 'static + Sized {
async fn next(&mut self) -> Option<SocketMessage>;
}
#[async_trait::async_trait]
pub trait ServerSocketProxyService: Sync + Send + 'static + Sized {
type Sender: SocketSender;
type Reciever: SocketReciever;
async fn connect(req: HttpRequest, connection: Connection) -> Result<Self>;
fn get_socket(self) -> Socket<Self::Sender, Self::Reciever>;
async fn proxy_loop<T1, T2>(self, client: Socket<T1, T2>) -> Result<()>
where
T1: SocketSender,
T2: SocketReciever,
{
let backend = self.get_socket();
let mut reciever = client.reciever;
let mut sender = backend.sender;
let c2b = async move {
while let Some(msg) = reciever.next().await {
match msg {
SocketMessage::Close => {
let _ = sender.send(SocketMessage::Close).await;
break;
}
SocketMessage::None => {}
msg => match sender.send(msg).await {
Ok(s) => sender = s,
Err(e) => {
log_err!("WebSocket proxy send error: {}", e);
break;
}
},
}
}
};
let mut sender = client.sender;
let mut reciever = backend.reciever;
let b2c = async move {
while let Some(msg) = reciever.next().await {
match msg {
SocketMessage::Close => {
let _ = sender.send(SocketMessage::Close).await;
break;
}
SocketMessage::None => {}
msg => match sender.send(msg).await {
Ok(s) => sender = s,
Err(e) => {
log_err!("WebSocket proxy send error: {}", e);
break;
}
},
}
}
};
futures_util::future::join(c2b, b2c).await;
Ok(())
}
}
pub enum SocketMessage {
Text(String),
Binary(Bytes),
Ping(Bytes),
Pong(Bytes),
Close,
None,
}

View File

@@ -0,0 +1,56 @@
use std::{str::FromStr, sync::Arc};
use bytes::Bytes;
use reqwest::{
Method,
header::{HeaderMap, HeaderName, HeaderValue},
};
use crate::settings::Result;
use super::{Connection, HttpRequest, HttpResponse, ServerProxyService};
pub struct ReqwestProxy {
client: reqwest::Client,
}
#[async_trait::async_trait]
impl ServerProxyService for ReqwestProxy {
async fn new() -> Arc<Self> {
Self {
client: reqwest::Client::new(),
}
.into()
}
async fn fetch(&self, req: HttpRequest, connection: Connection) -> Result<HttpResponse> {
let mut headers = HeaderMap::new();
for (key, value) in req.headers {
headers.insert(key.parse::<HeaderName>()?, value.parse::<HeaderValue>()?);
}
let res = self
.client
.request(
Method::from_str(&req.method)?,
format!(
"{}://{}:{}{}",
connection.protocol, connection.ip_address, connection.port, req.path
),
)
.headers(headers)
.body(req.body)
.send()
.await?;
let headers = res
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
Ok(HttpResponse::new(
res.status().as_u16(),
headers,
res.bytes().await.ok().unwrap_or_else(|| Bytes::new()),
))
}
}

View File

@@ -0,0 +1,93 @@
use futures_util::{
SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
use crate::settings::Result;
use super::{
Connection, HttpRequest, ServerSocketProxyService, Socket, SocketMessage, SocketReciever,
SocketSender,
};
pub struct TokioTungsteniteSocketProxy(
Socket<TokioTungsteniteSocketSender, TokioTungsteniteSocketReciever>,
);
#[async_trait::async_trait]
impl ServerSocketProxyService for TokioTungsteniteSocketProxy {
type Sender = TokioTungsteniteSocketSender;
type Reciever = TokioTungsteniteSocketReciever;
async fn connect(req: HttpRequest, connection: Connection) -> Result<Self> {
let url = format!(
"{}://{}:{}{}",
connection.protocol.replace("http", "ws"),
connection.ip_address,
connection.port,
req.path
);
let (ws_stream, _) = connect_async(url).await?;
let (ses, stream) = ws_stream.split();
Ok(Self(Socket {
sender: TokioTungsteniteSocketSender(ses),
reciever: TokioTungsteniteSocketReciever(stream),
}))
}
fn get_socket(self) -> Socket<Self::Sender, Self::Reciever> {
self.0
}
}
pub struct TokioTungsteniteSocketSender(
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
);
#[async_trait::async_trait]
impl SocketSender for TokioTungsteniteSocketSender {
async fn send(mut self, msg: SocketMessage) -> Result<Self> {
match msg {
SocketMessage::Text(text) => {
self.0.send(Message::Text(text.into())).await?;
Ok(self)
}
SocketMessage::Binary(bin) => {
self.0.send(Message::Binary(bin)).await?;
Ok(self)
}
SocketMessage::Ping(payload) => {
self.0.send(Message::Ping(payload)).await?;
Ok(self)
}
SocketMessage::Pong(payload) => {
self.0.send(Message::Pong(payload)).await?;
Ok(self)
}
SocketMessage::Close => {
self.0.close().await?;
Err("closed".into())
}
SocketMessage::None => Ok(self),
}
}
}
pub struct TokioTungsteniteSocketReciever(SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>);
#[async_trait::async_trait]
impl SocketReciever for TokioTungsteniteSocketReciever {
async fn next(&mut self) -> Option<SocketMessage> {
self.0.next().await.map(|x| match x {
Ok(Message::Text(text)) => SocketMessage::Text(text.to_string()),
Ok(Message::Binary(bin)) => SocketMessage::Binary(bin),
Ok(Message::Ping(bin)) => SocketMessage::Ping(bin),
Ok(Message::Pong(bin)) => SocketMessage::Pong(bin),
Ok(Message::Close(_)) => SocketMessage::Close,
_ => SocketMessage::None,
})
}
}

24
src/service.rs Normal file
View File

@@ -0,0 +1,24 @@
use std::sync::Arc;
use crate::{context::Context, log_err, log_warn, settings::Result};
#[async_trait::async_trait]
pub trait Service: Sync + Send + 'static {
fn new() -> Arc<Self>
where
Self: Sized;
async fn run(self: Arc<Self>, context: Context) {
let mut code = 0;
if let Err(e) = self.clone().run_inner(context.clone()).await {
log_err!("{} error: {}", self.service_name(), e);
code = 1;
}
log_warn!("{} stopped: exiting...", self.service_name());
std::process::exit(code);
}
async fn run_inner(self: Arc<Self>, context: Context) -> Result<()>;
fn service_name(&self) -> &'static str;
}

36
src/settings.rs Normal file
View File

@@ -0,0 +1,36 @@
// Runner
pub use crate::runner::tokio::TokioRunner as CurrentRunner;
pub use tokio::{
process::Command as AsyncCommand,
sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard},
};
// Docker
pub type Docker = crate::docker::DockerService<crate::docker::bollard::BollardDocker>;
// Server
pub type Server = crate::server::ServerService<
crate::server::actix_listener::ActixListener,
crate::server::reqwest_proxy::ReqwestProxy,
crate::server::tokio_tungstenite_socket_proxy::TokioTungsteniteSocketProxy,
>;
// Tls
pub type Tls = crate::tls::TlsService<
crate::tls::certbot_updater::CertbotUpdater,
crate::tls::ssl_expiration_checker::SslExpirationChecker,
>;
// Check and Test tools
pub mod tools {
pub type Check = crate::check::CheckService;
pub type Test = crate::test::TestService;
}
// Error
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;
pub async fn async_sleep(duration: std::time::Duration) {
tokio::time::sleep(duration).await;
}

24
src/test/mod.rs Normal file
View File

@@ -0,0 +1,24 @@
use std::sync::Arc;
use crate::{context::Context, log, service::Service, settings::Result};
pub struct TestService;
#[async_trait::async_trait]
impl Service for TestService {
fn new() -> Arc<Self>
where
Self: Sized,
{
Self.into()
}
async fn run_inner(self: Arc<Self>, _context: Context) -> Result<()> {
log!("No tests avaliable...");
Ok(())
}
fn service_name(&self) -> &'static str {
"Test"
}
}

View File

@@ -0,0 +1,53 @@
use std::{fs, sync::Arc};
use crate::settings::{AsyncCommand, Result};
use super::TlsUpdaterService;
pub struct CertbotUpdater;
#[async_trait::async_trait]
impl TlsUpdaterService for CertbotUpdater {
fn new() -> Arc<Self> {
Self.into()
}
async fn update_certificate(&self, email: &str, domain: &str) -> Result<(String, String)> {
if AsyncCommand::new("certbot")
.arg("certonly")
.arg("--webroot")
.arg("-w")
.arg("/home/data/acme-webroot")
.arg("--cert-name")
.arg(domain)
.arg("-d")
.arg(domain)
.arg("--email")
.arg(email)
.arg("--agree-tos")
.arg("--non-interactive")
.spawn()?
.wait()
.await?
.success()
{
self.get_certificate(domain).await
} else {
Err("Failed to update certificate".into())
}
}
async fn get_certificate(&self, domain: &str) -> Result<(String, String)> {
let key_path = format!("/etc/letsencrypt/live/{domain}/privkey.pem");
if !fs::exists(&key_path).unwrap_or_default() {
return Err(format!("{key_path} not found").into());
}
let cert_path = format!("/etc/letsencrypt/live/{domain}/fullchain.pem");
if !fs::exists(&cert_path).unwrap_or_default() {
return Err(format!("{cert_path} not found").into());
}
Ok((key_path, cert_path))
}
}

133
src/tls/mod.rs Normal file
View File

@@ -0,0 +1,133 @@
pub mod certbot_updater;
pub mod ssl_expiration_checker;
use std::{env, sync::Arc, time::Duration};
use ormlite::Model;
use crate::{
context::{Context, Event},
log_err,
models::Connection,
service::Service,
settings::{Result, async_sleep},
};
#[derive(Clone)]
pub struct TlsService<T1, T2>
where
T1: TlsUpdaterService,
T2: TlsCheckerService,
{
updater: Arc<T1>,
checker: Arc<T2>,
email: String,
}
#[async_trait::async_trait]
impl<T1, T2> Service for TlsService<T1, T2>
where
T1: TlsUpdaterService,
T2: TlsCheckerService,
{
fn new() -> Arc<Self>
where
Self: Sized,
{
Self {
updater: T1::new(),
checker: T2::new(),
email: env::var("EMAIL")
.expect(
"For Tls service to work, you must specify the 'EMAIL' environment variable.",
)
.trim()
.to_owned(),
}
.into()
}
async fn run_inner(self: Arc<Self>, context: Context) -> Result<()> {
context.on_update_ssl(self.clone()).await;
let hour = Duration::from_secs(60 * 60);
loop {
if let Err(err) = self.invoke(&context).await {
log_err!("Tls error: {}", err);
}
async_sleep(hour).await;
}
}
fn service_name(&self) -> &'static str {
"Tls"
}
}
#[async_trait::async_trait]
impl<T1, T2> Event for TlsService<T1, T2>
where
T1: TlsUpdaterService,
T2: TlsCheckerService,
{
async fn invoke(&self, context: &Context) -> Result<()> {
let mut conn = context.db().await;
let connections = Connection::select().fetch_all(&mut *conn).await?;
drop(conn);
for domain in connections.iter().map(|x| x.domain.as_str()) {
if context.get_cert(domain).is_none() {
match if let Ok(cert) = self.updater.get_certificate(domain).await {
Ok(cert)
} else {
self.updater.update_certificate(&self.email, domain).await
} {
Ok(cert) => {
context.add_cert(domain.to_owned(), cert).await;
}
Err(e) => {
log_err!("Failed to get certificate for {}: {}", domain, e);
}
}
} else {
if self.checker.check(domain).await.is_err() {
match self.updater.update_certificate(&self.email, domain).await {
Ok(cert) => {
context.add_cert(domain.to_owned(), cert).await;
}
Err(e) => {
log_err!("Failed to update certificate for {}: {}", domain, e);
}
}
}
}
}
Ok(())
}
}
#[async_trait::async_trait]
pub trait TlsUpdaterService: Send + Sync + 'static + Sized {
fn new() -> Arc<Self>;
async fn update_certificate(&self, email: &str, domain: &str) -> Result<(String, String)>;
async fn get_certificate(&self, domain: &str) -> Result<(String, String)>;
}
pub struct NeedsRenewal;
impl<T: std::error::Error> From<T> for NeedsRenewal {
fn from(_: T) -> Self {
NeedsRenewal
}
}
#[async_trait::async_trait]
pub trait TlsCheckerService: Send + Sync + 'static + Sized {
fn new() -> Arc<Self>;
async fn check(&self, domain: &str) -> std::result::Result<(), NeedsRenewal>;
}

View File

@@ -0,0 +1,20 @@
use std::sync::Arc;
use super::{NeedsRenewal, TlsCheckerService};
pub struct SslExpirationChecker;
#[async_trait::async_trait]
impl TlsCheckerService for SslExpirationChecker {
fn new() -> Arc<Self> {
Self.into()
}
async fn check(&self, domain: &str) -> Result<(), NeedsRenewal> {
let expires = ssl_expiration2::SslExpiration::from_domain_name(domain)?;
if expires.days() < 7 {
Err(NeedsRenewal)
} else {
Ok(())
}
}
}