diff --git a/src/bin/main.rs b/src/bin/main.rs index 51bdf1a..e3a2a83 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -11,6 +11,7 @@ use elseware::entity::character::NewCharacterEntity; use elseware::entity::item::{NewItemEntity, ItemDetail, ItemLocation}; use elseware::entity::item; +use elseware::common::mainloop::*; fn setup_logger() { let colors = fern::colors::ColoredLevelConfig::new() @@ -185,39 +186,28 @@ fn main() { }).await; } - let patch = async_std::task::spawn(async { - info!("[patch] starting server"); - let patch_config = load_config(); - let patch_motd = load_motd(); - let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str()); - let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd); - - elseware::common::mainloop::mainloop_async(patch_state, patch_config.port).await; - }); + info!("[patch] starting server"); + let patch_config = load_config(); + let patch_motd = load_motd(); + let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str()); + let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd); + let patch_loop = patch_mainloop(patch_state, patch_config.port); let thread_entity_gateway = entity_gateway.clone(); - let auth = async_std::task::spawn(async { - info!("[auth] starting server"); - let auth_state = LoginServerState::new(thread_entity_gateway); - - elseware::common::mainloop::mainloop_async(auth_state, elseware::login::login::LOGIN_PORT).await; - }); + info!("[auth] starting server"); + let login_state = LoginServerState::new(thread_entity_gateway); + let login_loop = login_mainloop(login_state, elseware::login::login::LOGIN_PORT); let thread_entity_gateway = entity_gateway.clone(); - let character = async_std::task::spawn(async { - info!("[character] starting server"); - let char_state = CharacterServerState::new(thread_entity_gateway); - - elseware::common::mainloop::mainloop_async(char_state, elseware::login::character::CHARACTER_PORT).await; - }); + info!("[character] starting server"); + let char_state = CharacterServerState::new(thread_entity_gateway); + let character_loop = character_mainloop(char_state, elseware::login::character::CHARACTER_PORT); let thread_entity_gateway = entity_gateway.clone(); - let ship = async_std::task::spawn(async { - info!("[ship] starting server"); - let ship_state = ShipServerState::new(thread_entity_gateway); - elseware::common::mainloop::mainloop_async(ship_state, elseware::ship::ship::SHIP_PORT).await; - }); + info!("[ship] starting server"); + let ship_state = ShipServerState::new(thread_entity_gateway); + let ship_loop = ship_mainloop(ship_state, elseware::ship::ship::SHIP_PORT); - futures::join!(patch, auth, character, ship); + futures::future::join_all(vec![patch_loop, login_loop, character_loop, ship_loop]).await; }); } diff --git a/src/common/mainloop.rs b/src/common/mainloop.rs index 38a9c50..0439e0f 100644 --- a/src/common/mainloop.rs +++ b/src/common/mainloop.rs @@ -1,14 +1,24 @@ #![allow(dead_code)] +use std::pin::Pin; +use futures::future::Future; use log::{trace, info, warn}; use async_std::sync::{Arc, Mutex}; use async_std::io::prelude::{ReadExt, WriteExt}; use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use serde::de::DeserializeOwned; use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use libpso::PacketParseError; use crate::common::serverstate::ClientId; use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; +use crate::common::interserver::InterserverActor; +use crate::patch::patch::PatchServerState; +use crate::login::login::LoginServerState; +use crate::login::character::CharacterServerState; +use crate::ship::ship::ShipServerState; +use crate::entity::gateway::entitygateway::EntityGateway; #[derive(Debug)] pub enum NetworkError { @@ -265,7 +275,7 @@ async fn client_send_loop(client_id: ClientId, } -pub async fn mainloop_async(state: STATE, port: u16) where +pub async fn simple_mainloop(state: STATE, port: u16) where STATE: ServerState + Send + 'static, S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static, R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static, @@ -299,3 +309,135 @@ pub async fn mainloop_async(state: STATE, port: u16) where listener.await } + +//////////////////////////////////////////////////////////// + + + +async fn state_client_loop(state: Arc>, + server_state_receiver: async_std::sync::Receiver, R>>) where + STATE: ServerState + Send + 'static, + S: SendServerPacket + std::fmt::Debug + Send + 'static, + R: RecvServerPacket + std::fmt::Debug + Send + 'static, + E: std::fmt::Debug + Send, +{ + async_std::task::spawn(async move { + let mut clients = HashMap::new(); + + loop { + let action = server_state_receiver.recv().await.unwrap(); + let mut state = state.lock().await; + + match action { + ClientAction::NewClient(client_id, sender) => { + clients.insert(client_id, sender.clone()); + for action in state.on_connect(client_id) { + match action { + OnConnect::Cipher((inc, outc)) => { + sender.send(ServerStateAction::Cipher(inc, outc)).await; + }, + OnConnect::Packet(pkt) => { + sender.send(ServerStateAction::Packet(pkt)).await; + } + } + } + }, + ClientAction::Packet(client_id, pkt) => { + let pkts = state.handle(client_id, &pkt).await; + match pkts { + Ok(pkts) => { + for (client_id, pkt) in pkts { + if let Some(client) = clients.get_mut(&client_id) { + client.send(ServerStateAction::Packet(pkt)).await; + } + } + }, + Err(err) => { + warn!("[client {:?} state handler error] {:?}", client_id, err); + } + } + }, + ClientAction::Disconnect(client_id) => { + let pkts = state.on_disconnect(client_id); + for (client_id, pkt) in pkts { + if let Some(client) = clients.get_mut(&client_id) { + client.send(ServerStateAction::Packet(pkt)).await; + } + } + + if let Some(client) = clients.get_mut(&client_id) { + client.send(ServerStateAction::Disconnect).await; + } + } + } + } + }); +} + + +pub fn client_accept_mainloop(state: Arc>, client_port: u16) -> impl Future +where + STATE: ServerState + Send + 'static, + S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static, + R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static, + E: std::fmt::Debug + Send, +{ + let listener = async_std::task::spawn(async move { + let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), client_port))).await.unwrap(); + let mut id = 1; + + let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); + + state_client_loop(state, server_state_receiver).await; + + loop { + let (sock, addr) = listener.accept().await.unwrap(); + let client_id = crate::common::serverstate::ClientId(id); + id += 1; + + info!("new client {:?} {:?} {:?}", client_id, sock, addr); + + let (client_sender, client_receiver) = async_std::sync::channel(64); + let socket = Arc::new(sock); + let cipher_in: Arc>> = Arc::new(Mutex::new(Box::new(NullCipher {}))); + let cipher_out: Arc>> = Arc::new(Mutex::new(Box::new(NullCipher {}))); + + client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await; + client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await; + } + }); + + listener +} + + + +pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin>> { + let patch_state = Arc::new(Mutex::new(patch_state)); + let client_mainloop = client_accept_mainloop(patch_state, patch_port); + Box::pin(client_mainloop) +} + +pub fn login_mainloop(login_state: LoginServerState, login_port: u16) -> Pin>> { + let login_state = Arc::new(Mutex::new(login_state)); + let client_mainloop = client_accept_mainloop(login_state.clone(), login_port); + //let ship_communication_mainloop = interserver_listen_mainloop(login_state.clone(), ship_listen_port); + Box::pin(client_mainloop) +} + +pub fn character_mainloop(character_state: CharacterServerState, character_port: u16) -> Pin>> { + let character_state = Arc::new(Mutex::new(character_state)); + let client_mainloop = client_accept_mainloop(character_state, character_port); + Box::pin(client_mainloop) +} + + +pub fn ship_mainloop(ship_state: ShipServerState, ship_port: u16) -> Pin>> { + let ship_state = Arc::new(Mutex::new(ship_state)); + let client_mainloop = client_accept_mainloop(ship_state, ship_port); + //let login_mainloop = ship_to_login_mainloop(ship_state, login_port); + //let admin_mainloop = ship_admin_mainloop(ship_state, admin_port); + + //futures::future::join_all(vec![client_mainloop, login_mainloop, admin_mainloop]) + Box::pin(client_mainloop) +}