diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs index a2d2b44..f876dba 100644 --- a/anvil/src/xwayland/mod.rs +++ b/anvil/src/xwayland/mod.rs @@ -1,4 +1,6 @@ -use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc}; +use std::{ + cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc, +}; use smithay::{ reexports::wayland_server::{protocol::wl_surface::WlSurface, Client}, @@ -40,13 +42,13 @@ impl AnvilState { let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap(); let wm = Rc::new(RefCell::new(wm)); client.data_map().insert_if_missing(|| Rc::clone(&wm)); + let log = self.log.clone(); self.handle - .insert_source(source, move |events, _, _| { - let mut wm = wm.borrow_mut(); - for event in events.into_iter() { - wm.handle_event(event, &client)?; + .insert_source(source, move |event, _, _| { + match wm.borrow_mut().handle_event(event, &client) { + Ok(()) => {} + Err(err) => error!(log, "Error while handling X11 event: {}", err), } - Ok(()) }) .unwrap(); } @@ -60,12 +62,13 @@ x11rb::atom_manager! { Atoms: AtomsCookie { WM_S0, WL_SURFACE_ID, + _ANVIL_CLOSE_CONNECTION, } } /// The actual runtime state of the XWayland integration. struct X11State { - conn: Rc, + conn: Arc, atoms: Atoms, log: slog::Logger, unpaired_surfaces: HashMap)>, @@ -115,16 +118,16 @@ impl X11State { conn.flush()?; - let conn = Rc::new(conn); + let conn = Arc::new(conn); let wm = Self { - conn: Rc::clone(&conn), + conn: Arc::clone(&conn), atoms, unpaired_surfaces: Default::default(), window_map, - log, + log: log.clone(), }; - Ok((wm, X11Source::new(conn))) + Ok((wm, X11Source::new(conn, win, atoms._ANVIL_CLOSE_CONNECTION, log))) } fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { @@ -199,6 +202,7 @@ impl X11State { } _ => {} } + self.conn.flush()?; Ok(()) } diff --git a/anvil/src/xwayland/x11rb_event_source.rs b/anvil/src/xwayland/x11rb_event_source.rs index f3991e4..694b434 100644 --- a/anvil/src/xwayland/x11rb_event_source.rs +++ b/anvil/src/xwayland/x11rb_event_source.rs @@ -1,35 +1,102 @@ use std::{ - io::{Error as IOError, ErrorKind, Result as IOResult}, - os::unix::io::AsRawFd, - rc::Rc, + io::Result as IOResult, + sync::Arc, + thread::{spawn, JoinHandle}, }; use x11rb::{ - connection::Connection as _, errors::ReplyOrIdError, protocol::Event, rust_connection::RustConnection, + connection::Connection as _, + protocol::{ + xproto::{Atom, ClientMessageEvent, ConnectionExt as _, EventMask, Window, CLIENT_MESSAGE_EVENT}, + Event, + }, + rust_connection::RustConnection, }; use smithay::reexports::calloop::{ - generic::{Fd, Generic}, - EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, + channel::{sync_channel, Channel, Event as ChannelEvent, SyncSender}, + EventSource, Poll, PostAction, Readiness, Token, TokenFactory, }; +/// Integration of an x11rb X11 connection with calloop. +/// +/// This is a thin wrapper around `Channel`. It works by spawning an extra thread reads events from +/// the X11 connection and then sends them across the channel. +/// +/// See [1] for why this extra thread is necessary. The single-thread solution proposed on that +/// page does not work with calloop, since it requires checking something on every main loop +/// iteration. Calloop only allows "when an FD becomes readable". +/// +/// [1]: https://docs.rs/x11rb/0.8.1/x11rb/event_loop_integration/index.html#threads-and-races pub struct X11Source { - connection: Rc, - generic: Generic, + connection: Arc, + channel: Channel, + event_thread: Option>, + close_window: Window, + close_type: Atom, + log: slog::Logger, } impl X11Source { - pub fn new(connection: Rc) -> Self { - let fd = Fd(connection.stream().as_raw_fd()); - let generic = Generic::new(fd, Interest::READ, Mode::Level); - Self { connection, generic } + /// Create a new X11 source. + /// + /// The returned instance will use `SendRequest` to cause a `ClientMessageEvent` to be sent to + /// the given window with the given type. The expectation is that this is a window that was + /// created by us. Thus, the event reading thread will wake up and check an internal exit flag, + /// then exit. + pub fn new( + connection: Arc, + close_window: Window, + close_type: Atom, + log: slog::Logger, + ) -> Self { + let (sender, channel) = sync_channel(5); + let conn = Arc::clone(&connection); + let log2 = log.clone(); + let event_thread = Some(spawn(move || { + run_event_thread(conn, sender, log2); + })); + Self { + connection, + channel, + event_thread, + close_window, + close_type, + log, + } + } +} + +impl Drop for X11Source { + fn drop(&mut self) { + // Signal the worker thread to exit by dropping the read end of the channel. + // There is no easy and nice way to do this, so do it the ugly way: Replace it. + let (_, channel) = sync_channel(1); + self.channel = channel; + + // Send an event to wake up the worker so that it actually exits + let event = ClientMessageEvent { + response_type: CLIENT_MESSAGE_EVENT, + format: 8, + sequence: 0, + window: self.close_window, + type_: self.close_type, + data: [0; 20].into(), + }; + let _ = self + .connection + .send_event(false, self.close_window, EventMask::NO_EVENT, event); + let _ = self.connection.flush(); + + // Wait for the worker thread to exit + self.event_thread.take().map(|handle| handle.join()); } } impl EventSource for X11Source { - type Event = Vec; + type Event = Event; type Metadata = (); - type Ret = Result<(), ReplyOrIdError>; + type Ret = (); fn process_events( &mut self, @@ -40,36 +107,54 @@ impl EventSource for X11Source { where C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { - fn inner(conn: &RustConnection, mut callback: C) -> Result<(), ReplyOrIdError> - where - C: FnMut(Vec, &mut ()) -> Result<(), ReplyOrIdError>, - { - let mut events = Vec::new(); - while let Some(event) = conn.poll_for_event()? { - events.push(event); - } - if !events.is_empty() { - callback(events, &mut ())?; - } - conn.flush()?; - Ok(()) - } - let connection = &self.connection; - self.generic.process_events(readiness, token, |_, _| { - inner(connection, &mut callback).map_err(|err| IOError::new(ErrorKind::Other, err))?; - Ok(PostAction::Continue) - }) + let log = self.log.clone(); + self.channel + .process_events(readiness, token, move |event, meta| match event { + ChannelEvent::Closed => slog::warn!(log, "Event thread exited"), + ChannelEvent::Msg(event) => callback(event, meta), + }) } fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { - self.generic.register(poll, factory) + self.channel.register(poll, factory) } fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { - self.generic.reregister(poll, factory) + self.channel.reregister(poll, factory) } fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> { - self.generic.unregister(poll) + self.channel.unregister(poll) + } +} + +/// This thread reads X11 events from the connection and sends them on the channel. +/// +/// This is run in an extra thread since sending an X11 request or waiting for the reply to an X11 +/// request can both read X11 events from the underlying socket which are then saved in the +/// RustConnection. Thus, readability of the underlying socket is not enough to guarantee we do not +/// miss wakeups. +/// +/// This thread will call wait_for_event(). RustConnection then ensures internally to wake us up +/// when an event arrives. So far, this seems to be the only safe way to integrate x11rb with +/// calloop. +fn run_event_thread(connection: Arc, sender: SyncSender, log: slog::Logger) { + loop { + let event = match connection.wait_for_event() { + Ok(event) => event, + Err(err) => { + // Connection errors are most likely permanent. Thus, exit the thread. + slog::crit!(log, "Event thread exiting due to connection error {}", err); + break; + } + }; + match sender.send(event) { + Ok(()) => {} + Err(_) => { + // The only possible error is that the other end of the channel was dropped. + // This happens in X11Source's Drop impl. + break; + } + } } }