From 6004ecdc144aed92a1bd47c4ff5317b0c1486245 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Sat, 14 Aug 2021 17:25:51 +0200 Subject: [PATCH] Anvil: Move X11 event reading to a thread x11rb caches X11 events internally. This cache is used when waiting for the reply of a request and an event is received first. This cache is however also used when sending a request, because the X11 client may never block and stop reading while writing a request. Doing so could cause a deadlock with the X11 server. Thus, this commit changes X11Source to spawn a thread internally. This thread calls wait_for_event() in a loop and uses a calloop channel to forward these events to the main loop. x11rb's RustConnection internally uses a ConditionVariable to make sure this thread will wake up when needed. Signed-off-by: Uli Schlachter --- anvil/src/xwayland/mod.rs | 23 ++--- anvil/src/xwayland/x11rb_event_source.rs | 102 +++++++++++++++-------- 2 files changed, 80 insertions(+), 45 deletions(-) diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs index a2d2b44..7df9651 100644 --- a/anvil/src/xwayland/mod.rs +++ b/anvil/src/xwayland/mod.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc}; +use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc}; use smithay::{ reexports::wayland_server::{protocol::wl_surface::WlSurface, Client}, @@ -40,13 +40,13 @@ impl AnvilState { let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap(); let wm = Rc::new(RefCell::new(wm)); client.data_map().insert_if_missing(|| Rc::clone(&wm)); + let log = self.log.clone(); self.handle - .insert_source(source, move |events, _, _| { - let mut wm = wm.borrow_mut(); - for event in events.into_iter() { - wm.handle_event(event, &client)?; + .insert_source(source, move |event, _, _| { + match wm.borrow_mut().handle_event(event, &client) { + Ok(()) => {}, + Err(err) => error!(log, "Error while handling X11 event: {}", err), } - Ok(()) }) .unwrap(); } @@ -65,7 +65,7 @@ x11rb::atom_manager! { /// The actual runtime state of the XWayland integration. struct X11State { - conn: Rc, + conn: Arc, atoms: Atoms, log: slog::Logger, unpaired_surfaces: HashMap)>, @@ -115,16 +115,16 @@ impl X11State { conn.flush()?; - let conn = Rc::new(conn); + let conn = Arc::new(conn); let wm = Self { - conn: Rc::clone(&conn), + conn: Arc::clone(&conn), atoms, unpaired_surfaces: Default::default(), window_map, - log, + log: log.clone(), }; - Ok((wm, X11Source::new(conn))) + Ok((wm, X11Source::new(conn, log))) } fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { @@ -199,6 +199,7 @@ impl X11State { } _ => {} } + self.conn.flush()?; Ok(()) } diff --git a/anvil/src/xwayland/x11rb_event_source.rs b/anvil/src/xwayland/x11rb_event_source.rs index f3991e4..f401640 100644 --- a/anvil/src/xwayland/x11rb_event_source.rs +++ b/anvil/src/xwayland/x11rb_event_source.rs @@ -1,35 +1,48 @@ use std::{ - io::{Error as IOError, ErrorKind, Result as IOResult}, - os::unix::io::AsRawFd, - rc::Rc, + io::Result as IOResult, + sync::{atomic::{AtomicBool, Ordering}, Arc}, + thread::{spawn, JoinHandle}, }; use x11rb::{ - connection::Connection as _, errors::ReplyOrIdError, protocol::Event, rust_connection::RustConnection, + connection::Connection as _, protocol::{Event, xproto::{CLIENT_MESSAGE_EVENT, Atom, ConnectionExt as _, ClientMessageEvent, EventMask, Window}}, rust_connection::RustConnection, }; use smithay::reexports::calloop::{ - generic::{Fd, Generic}, - EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, + channel::{sync_channel, Channel, Event as ChannelEvent, SyncSender}, + EventSource, Poll, PostAction, Readiness, Token, TokenFactory, }; +/// Integration of an x11rb X11 connection with calloop. +/// +/// This is a thin wrapper around `Channel`. It works by spawning an extra thread reads events from +/// the X11 connection and then sends them across the channel. +/// +/// See [1] for why this extra thread is necessary. The single-thread solution proposed on that +/// page does not work with calloop, since it requires checking something on every main loop +/// iteration. Calloop only allows "when an FD becomes readable". +/// +/// [1]: https://docs.rs/x11rb/0.8.1/x11rb/event_loop_integration/index.html#threads-and-races pub struct X11Source { - connection: Rc, - generic: Generic, + channel: Channel, } impl X11Source { - pub fn new(connection: Rc) -> Self { - let fd = Fd(connection.stream().as_raw_fd()); - let generic = Generic::new(fd, Interest::READ, Mode::Level); - Self { connection, generic } + /// Create a new X11 source. + pub fn new(connection: Arc, log: slog::Logger) -> Self { + let (sender, channel) = sync_channel(5); + let conn = Arc::clone(&connection); + let event_thread = Some(spawn(move || { + run_event_thread(connection, sender, log); + })); + Self { channel } } } impl EventSource for X11Source { - type Event = Vec; + type Event = Event; type Metadata = (); - type Ret = Result<(), ReplyOrIdError>; + type Ret = (); fn process_events( &mut self, @@ -40,36 +53,57 @@ impl EventSource for X11Source { where C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { - fn inner(conn: &RustConnection, mut callback: C) -> Result<(), ReplyOrIdError> - where - C: FnMut(Vec, &mut ()) -> Result<(), ReplyOrIdError>, - { - let mut events = Vec::new(); - while let Some(event) = conn.poll_for_event()? { - events.push(event); + let log = self.log.clone(); + self.channel.process_events(readiness, token, move |event, meta| { + match event { + ChannelEvent::Closed => slog::warn!(log, "Event thread exited"), + ChannelEvent::Msg(event) => callback(event, meta) } - if !events.is_empty() { - callback(events, &mut ())?; - } - conn.flush()?; - Ok(()) - } - let connection = &self.connection; - self.generic.process_events(readiness, token, |_, _| { - inner(connection, &mut callback).map_err(|err| IOError::new(ErrorKind::Other, err))?; - Ok(PostAction::Continue) }) } fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { - self.generic.register(poll, factory) + self.channel.register(poll, factory) } fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { - self.generic.reregister(poll, factory) + self.channel.reregister(poll, factory) } fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> { - self.generic.unregister(poll) + self.channel.unregister(poll) + } +} + +/// This thread reads X11 events from the connection and sends them on the channel. +/// +/// This is run in an extra thread since sending an X11 request or waiting for the reply to an X11 +/// request can both read X11 events from the underlying socket which are then saved in the +/// RustConnection. Thus, readability of the underlying socket is not enough to guarantee we do not +/// miss wakeups. +/// +/// This thread will call wait_for_event(). RustConnection then ensures internally to wake us up +/// when an event arrives. So far, this seems to be the only safe way to integrate x11rb with +/// calloop. +fn run_event_thread(connection: Arc, sender: SyncSender, log: slog::Logger) { + loop { + let event = match connection.wait_for_event() { + Ok(event) => event, + Err(err) => { + // Connection errors are most likely permanent. Thus, exit the thread. + slog::crit!(log, "Event thread exiting due to connection error {}", err); + break; + } + }; + match sender.send(event) { + Ok(()) => {}, + Err(_) => { + // The only possible error is that the other end of the channel was dropped. + // This code should be unreachable, because X11Source owns the channel and waits + // for this thread to exit in its Drop implementation. + slog::info!(log, "Event thread exiting due to send error"); + break; + } + } } }