Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mania-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub fn handle_event(attr: TokenStream, item: TokenStream) -> TokenStream {

fn #wrapper_fn_name<'a>(
event: &'a mut dyn crate::core::event::ServerEvent,
handle: std::sync::Arc<crate::core::business::BusinessHandle>,
handle: std::sync::Arc<crate::core::business::BusinessHandle<()>>,
flow: crate::core::business::LogicFlow,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<&'a dyn ServerEvent, crate::core::business::BusinessError>> + Send + 'a>> {
Box::pin(#fn_name(event, handle, flow))
Expand Down
115 changes: 62 additions & 53 deletions mania/src/core/business.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Display for LogicFlow {

type LogicHandleFn = for<'a> fn(
&'a mut dyn ServerEvent,
Arc<BusinessHandle>,
Arc<BusinessHandle<()>>,
LogicFlow,
) -> Pin<
Box<dyn Future<Output = Result<&'a dyn ServerEvent, BusinessError>> + Send + 'a>,
Expand Down Expand Up @@ -91,31 +91,36 @@ static LOGIC_MAP: Lazy<LogicHandlerMap> = Lazy::new(|| {
map
});

pub async fn dispatch_logic(
pub async fn dispatch_logic<H>(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
flow: LogicFlow,
) -> Result<&dyn ServerEvent, BusinessError> {
let tid = event.as_any().type_id();
if let Some(fns) = LOGIC_MAP.get(&tid) {
tracing::trace!("[{}] Found {} handlers for {:?}.", flow, fns.len(), event);
for handle_fn in fns.iter() {
handle_fn(event, handle.to_owned(), flow).await?;
}
} else {
tracing::trace!("[{}] No handler found for {:?}", flow, event);
}
Ok(event)
todo!()
//let tid = event.as_any().type_id();
//if let Some(fns) = LOGIC_MAP.get(&tid) {
// tracing::trace!("[{}] Found {} handlers for {:?}.", flow, fns.len(), event);
// for handle_fn in fns.iter() {
// handle_fn(event, handle.to_owned(), flow).await?;
// }
//} else {
// tracing::trace!("[{}] No handler found for {:?}", flow, event);
//}
//Ok(event)
}

pub struct Business {
pub struct Business<H> {
addr: SocketAddr,
receiver: PacketReceiver,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
}

impl Business {
pub async fn new(config: Arc<ClientConfig>, context: Arc<Context>) -> BusinessResult<Self> {
impl<H> Business<H> {
pub async fn new(
config: Arc<ClientConfig>,
context: Context,
handler: H,
) -> BusinessResult<Self> {
let addr = optimum_server(config.get_optimum_server, config.use_ipv6_network).await?;
let (sender, receiver) = socket::connect(addr).await?;
let event_dispatcher = EventDispatcher::new();
Expand All @@ -126,6 +131,7 @@ impl Business {
pending_requests: DashMap::new(),
context,
cache: Arc::new(Cache::new(config.cache_mode)), // TODO: construct from context
event_handler: handler,
event_dispatcher,
event_listener,
highway: Arc::new(Highway::default()),
Expand All @@ -138,10 +144,44 @@ impl Business {
})
}

pub fn handle(&self) -> Arc<BusinessHandle> {
pub fn handle(&self) -> Arc<BusinessHandle<H>> {
self.handle.clone()
}

async fn try_reconnect(&mut self) -> BusinessResult<()> {
tracing::info!("Reconnecting to server: {}", self.addr);

let (sender, receiver) = socket::connect(self.addr).await?;
self.handle.sender.store(Arc::new(sender));
self.receiver = receiver;

todo!(
"await Collection.Business.WtExchangeLogic.BotOnline(BotOnlineEvent.OnlineReason.Reconnect);"
)
}

async fn reconnect(&mut self) {
let handle = self.handle.clone();
let reconnecting = handle.set_reconnecting().await;
let mut try_interval = Duration::from_secs(1);
loop {
match self.try_reconnect().await {
Ok(_) => break,
Err(e) => {
tracing::error!("Reconnect failed: {}", e);
tracing::info!("Retrying in {} seconds", try_interval.as_secs());
tokio::time::sleep(try_interval).await;
if try_interval < Duration::from_secs(30) {
try_interval *= 2;
}
}
}
}
drop(reconnecting);
}
}

impl<H: Send + Sync + 'static> Business<H> {
// TODO: decouple
pub async fn spawn(&mut self) {
let handle_packets = async {
Expand Down Expand Up @@ -174,52 +214,21 @@ impl Business {
_ = handle_packets => {}
}
}

async fn try_reconnect(&mut self) -> BusinessResult<()> {
tracing::info!("Reconnecting to server: {}", self.addr);

let (sender, receiver) = socket::connect(self.addr).await?;
self.handle.sender.store(Arc::new(sender));
self.receiver = receiver;

todo!(
"await Collection.Business.WtExchangeLogic.BotOnline(BotOnlineEvent.OnlineReason.Reconnect);"
)
}

async fn reconnect(&mut self) {
let handle = self.handle.clone();
let reconnecting = handle.set_reconnecting().await;
let mut try_interval = Duration::from_secs(1);
loop {
match self.try_reconnect().await {
Ok(_) => break,
Err(e) => {
tracing::error!("Reconnect failed: {}", e);
tracing::info!("Retrying in {} seconds", try_interval.as_secs());
tokio::time::sleep(try_interval).await;
if try_interval < Duration::from_secs(30) {
try_interval *= 2;
}
}
}
}
drop(reconnecting);
}
}

pub struct BusinessHandle {
pub struct BusinessHandle<H> {
sender: ArcSwap<PacketSender>,
reconnecting: Mutex<()>,
pending_requests: DashMap<u32, oneshot::Sender<BusinessResult<CEParse>>>,
pub(crate) context: Arc<Context>,
pub(crate) context: Context,
pub(crate) cache: Arc<Cache>,
event_handler: H,
pub(crate) event_dispatcher: EventDispatcher,
pub event_listener: EventListener,
pub(crate) highway: Arc<Highway>,
}

impl BusinessHandle {
impl<H> BusinessHandle<H> {
/// Wait if the client is reconnecting.
async fn wait_reconnecting(&self) {
drop(self.reconnecting.lock().await);
Expand Down
6 changes: 3 additions & 3 deletions mania/src/core/business/caching_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
#[handle_event(GroupSysIncreaseEvent, GroupSysDecreaseEvent)]
async fn caching_logic(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<()>>,
flow: LogicFlow,
) -> Result<&dyn ServerEvent, BusinessError> {
match flow {
Expand All @@ -18,9 +18,9 @@ async fn caching_logic(
}
}

async fn caching_logic_incoming(
async fn caching_logic_incoming<H>(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
) -> &dyn ServerEvent {
match event {
_ if let Some(increase) = event.as_any_mut().downcast_mut::<GroupSysIncreaseEvent>() => {
Expand Down
22 changes: 11 additions & 11 deletions mania/src/core/business/messaging_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ use std::sync::Arc;
FriendSysRequestEvent,
BotSysRenameEvent
)]
async fn messaging_logic(
async fn messaging_logic<H>(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
flow: LogicFlow,
) -> Result<&dyn ServerEvent, BusinessError> {
tracing::trace!("[{}] Handling event: {:?}", flow, event);
Expand All @@ -89,9 +89,9 @@ async fn messaging_logic(

// FIXME: avoid take things from event
// FIXME: (TODO) make it return Result(?)
async fn messaging_logic_incoming(
async fn messaging_logic_incoming<H>(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
) -> &dyn ServerEvent {
{
if let Some(msg) = event.as_any_mut().downcast_mut::<PushMessageEvent>() {
Expand Down Expand Up @@ -647,7 +647,7 @@ async fn messaging_logic_incoming(
event
}

async fn resolve_incoming_chain(chain: &mut MessageChain, handle: Arc<BusinessHandle>) {
async fn resolve_incoming_chain<H>(chain: &mut MessageChain, handle: Arc<BusinessHandle<H>>) {
for entity in &mut chain.entities {
match *entity {
Entity::Image(ref mut image) => {
Expand Down Expand Up @@ -845,9 +845,9 @@ async fn resolve_incoming_chain(chain: &mut MessageChain, handle: Arc<BusinessHa
}
}

async fn messaging_logic_outgoing(
async fn messaging_logic_outgoing<H>(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
) -> Result<&dyn ServerEvent, BusinessError> {
match event {
_ if let Some(send) = event.as_any_mut().downcast_mut::<SendMessageEvent>() => {
Expand All @@ -861,9 +861,9 @@ async fn messaging_logic_outgoing(
}

// TODO: error handling
async fn resolve_outgoing_chain(
async fn resolve_outgoing_chain<H>(
chain: &mut MessageChain,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
) -> Result<(), BusinessError> {
let entities: &mut Vec<Entity> = chain.entities.as_mut();
for entity in entities {
Expand Down Expand Up @@ -908,9 +908,9 @@ async fn resolve_outgoing_chain(
}

// TODO: return result!!!
async fn resolve_chain_metadata(
async fn resolve_chain_metadata<H>(
chain: &mut MessageChain,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
) -> &mut MessageChain {
match chain.typ {
MessageType::Group(ref mut grp)
Expand Down
6 changes: 3 additions & 3 deletions mania/src/core/business/wt_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;
#[handle_event(KickNTEvent)]
async fn messaging_logic(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<()>>,
flow: LogicFlow,
) -> Result<&dyn ServerEvent, BusinessError> {
match flow {
Expand All @@ -19,9 +19,9 @@ async fn messaging_logic(
}
}

async fn messaging_logic_incoming(
async fn messaging_logic_incoming<H>(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
handle: Arc<BusinessHandle<H>>,
) -> &dyn ServerEvent {
match event {
_ if let Some(kick) = event.as_any_mut().downcast_mut::<KickNTEvent>() => {
Expand Down
2 changes: 1 addition & 1 deletion mania/src/core/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static EVENT_MAP: Lazy<EventMap> = Lazy::new(|| {
map
});

pub async fn resolve_event(packet: SsoPacket, context: &Arc<Context>) -> CEParseResult {
pub async fn resolve_event(packet: SsoPacket, context: &Context) -> CEParseResult {
// Lagrange.Core.Internal.Context.ServiceContext.ResolveEventByPacket
let payload = PacketReader::new(packet.payload()).section(|p| p.bytes());
let Some(parse) = EVENT_MAP.get(packet.command()) else {
Expand Down
2 changes: 1 addition & 1 deletion mania/src/core/operation/cache_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

impl BusinessHandle {
impl<H> BusinessHandle<H> {
pub async fn uin2uid(
self: &Arc<Self>,
uin: u32,
Expand Down
2 changes: 1 addition & 1 deletion mania/src/core/operation/common_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::future::join_all;
use std::sync::Arc;
use tokio::join;

impl BusinessHandle {
impl<H> BusinessHandle<H> {
pub async fn fetch_rkey(self: &Arc<Self>) -> ManiaResult<()> {
let mut fetch_event = FetchRKeyEvent {};
let res = self.send_event(&mut fetch_event).await?;
Expand Down
2 changes: 1 addition & 1 deletion mania/src/core/operation/highway_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

impl BusinessHandle {
impl<H> BusinessHandle<H> {
async fn fetch_sig_session(self: &Arc<Self>) -> ManiaResult<Bytes> {
let mut req = FetchHighwayTicketEvent::default();
let req = self.send_event(&mut req).await?;
Expand Down
4 changes: 3 additions & 1 deletion mania/src/core/operation/wt_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;
use tokio::sync::watch;
use tokio::time::{sleep, timeout};

impl BusinessHandle {
impl<H> BusinessHandle<H> {
pub fn update_key_store(&self) -> &KeyStore {
&self.context.key_store
}
Expand Down Expand Up @@ -182,7 +182,9 @@ impl BusinessHandle {
)),
}
}
}

impl<H: Send + Sync + 'static> BusinessHandle<H> {
pub async fn online(self: &Arc<Self>) -> ManiaResult<watch::Sender<()>> {
let (tx, mut rx) = watch::channel::<()>(());
let res = self.send_event(&mut InfoSyncEvent).await?;
Expand Down
6 changes: 6 additions & 0 deletions mania/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ pub mod system;

pub trait ManiaEvent: std::fmt::Debug {}

use crate::event::system::bot_offline;

pub trait EventHandler {
fn on_bot_offline(&self, ev: bot_offline::BotOfflineEvent);
}

pub(crate) struct EventDispatcher {
pub(crate) system: watch::Sender<Option<system::SystemEvent>>,
pub(crate) friend: watch::Sender<Option<friend::FriendEvent>>,
Expand Down
Loading
Loading