From 6004ecdc144aed92a1bd47c4ff5317b0c1486245 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Sat, 14 Aug 2021 17:25:51 +0200 Subject: [PATCH 1/3] Anvil: Move X11 event reading to a thread x11rb caches X11 events internally. This cache is used when waiting for the reply of a request and an event is received first. This cache is however also used when sending a request, because the X11 client may never block and stop reading while writing a request. Doing so could cause a deadlock with the X11 server. Thus, this commit changes X11Source to spawn a thread internally. This thread calls wait_for_event() in a loop and uses a calloop channel to forward these events to the main loop. x11rb's RustConnection internally uses a ConditionVariable to make sure this thread will wake up when needed. Signed-off-by: Uli Schlachter --- anvil/src/xwayland/mod.rs | 23 ++--- anvil/src/xwayland/x11rb_event_source.rs | 102 +++++++++++++++-------- 2 files changed, 80 insertions(+), 45 deletions(-) diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs index a2d2b44..7df9651 100644 --- a/anvil/src/xwayland/mod.rs +++ b/anvil/src/xwayland/mod.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc}; +use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc}; use smithay::{ reexports::wayland_server::{protocol::wl_surface::WlSurface, Client}, @@ -40,13 +40,13 @@ impl AnvilState { let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap(); let wm = Rc::new(RefCell::new(wm)); client.data_map().insert_if_missing(|| Rc::clone(&wm)); + let log = self.log.clone(); self.handle - .insert_source(source, move |events, _, _| { - let mut wm = wm.borrow_mut(); - for event in events.into_iter() { - wm.handle_event(event, &client)?; + .insert_source(source, move |event, _, _| { + match wm.borrow_mut().handle_event(event, &client) { + Ok(()) => {}, + Err(err) => error!(log, "Error while handling X11 event: {}", err), } - Ok(()) }) .unwrap(); } @@ -65,7 +65,7 @@ x11rb::atom_manager! { /// The actual runtime state of the XWayland integration. struct X11State { - conn: Rc, + conn: Arc, atoms: Atoms, log: slog::Logger, unpaired_surfaces: HashMap)>, @@ -115,16 +115,16 @@ impl X11State { conn.flush()?; - let conn = Rc::new(conn); + let conn = Arc::new(conn); let wm = Self { - conn: Rc::clone(&conn), + conn: Arc::clone(&conn), atoms, unpaired_surfaces: Default::default(), window_map, - log, + log: log.clone(), }; - Ok((wm, X11Source::new(conn))) + Ok((wm, X11Source::new(conn, log))) } fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { @@ -199,6 +199,7 @@ impl X11State { } _ => {} } + self.conn.flush()?; Ok(()) } diff --git a/anvil/src/xwayland/x11rb_event_source.rs b/anvil/src/xwayland/x11rb_event_source.rs index f3991e4..f401640 100644 --- a/anvil/src/xwayland/x11rb_event_source.rs +++ b/anvil/src/xwayland/x11rb_event_source.rs @@ -1,35 +1,48 @@ use std::{ - io::{Error as IOError, ErrorKind, Result as IOResult}, - os::unix::io::AsRawFd, - rc::Rc, + io::Result as IOResult, + sync::{atomic::{AtomicBool, Ordering}, Arc}, + thread::{spawn, JoinHandle}, }; use x11rb::{ - connection::Connection as _, errors::ReplyOrIdError, protocol::Event, rust_connection::RustConnection, + connection::Connection as _, protocol::{Event, xproto::{CLIENT_MESSAGE_EVENT, Atom, ConnectionExt as _, ClientMessageEvent, EventMask, Window}}, rust_connection::RustConnection, }; use smithay::reexports::calloop::{ - generic::{Fd, Generic}, - EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, + channel::{sync_channel, Channel, Event as ChannelEvent, SyncSender}, + EventSource, Poll, PostAction, Readiness, Token, TokenFactory, }; +/// Integration of an x11rb X11 connection with calloop. +/// +/// This is a thin wrapper around `Channel`. It works by spawning an extra thread reads events from +/// the X11 connection and then sends them across the channel. +/// +/// See [1] for why this extra thread is necessary. The single-thread solution proposed on that +/// page does not work with calloop, since it requires checking something on every main loop +/// iteration. Calloop only allows "when an FD becomes readable". +/// +/// [1]: https://docs.rs/x11rb/0.8.1/x11rb/event_loop_integration/index.html#threads-and-races pub struct X11Source { - connection: Rc, - generic: Generic, + channel: Channel, } impl X11Source { - pub fn new(connection: Rc) -> Self { - let fd = Fd(connection.stream().as_raw_fd()); - let generic = Generic::new(fd, Interest::READ, Mode::Level); - Self { connection, generic } + /// Create a new X11 source. + pub fn new(connection: Arc, log: slog::Logger) -> Self { + let (sender, channel) = sync_channel(5); + let conn = Arc::clone(&connection); + let event_thread = Some(spawn(move || { + run_event_thread(connection, sender, log); + })); + Self { channel } } } impl EventSource for X11Source { - type Event = Vec; + type Event = Event; type Metadata = (); - type Ret = Result<(), ReplyOrIdError>; + type Ret = (); fn process_events( &mut self, @@ -40,36 +53,57 @@ impl EventSource for X11Source { where C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { - fn inner(conn: &RustConnection, mut callback: C) -> Result<(), ReplyOrIdError> - where - C: FnMut(Vec, &mut ()) -> Result<(), ReplyOrIdError>, - { - let mut events = Vec::new(); - while let Some(event) = conn.poll_for_event()? { - events.push(event); + let log = self.log.clone(); + self.channel.process_events(readiness, token, move |event, meta| { + match event { + ChannelEvent::Closed => slog::warn!(log, "Event thread exited"), + ChannelEvent::Msg(event) => callback(event, meta) } - if !events.is_empty() { - callback(events, &mut ())?; - } - conn.flush()?; - Ok(()) - } - let connection = &self.connection; - self.generic.process_events(readiness, token, |_, _| { - inner(connection, &mut callback).map_err(|err| IOError::new(ErrorKind::Other, err))?; - Ok(PostAction::Continue) }) } fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { - self.generic.register(poll, factory) + self.channel.register(poll, factory) } fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { - self.generic.reregister(poll, factory) + self.channel.reregister(poll, factory) } fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> { - self.generic.unregister(poll) + self.channel.unregister(poll) + } +} + +/// This thread reads X11 events from the connection and sends them on the channel. +/// +/// This is run in an extra thread since sending an X11 request or waiting for the reply to an X11 +/// request can both read X11 events from the underlying socket which are then saved in the +/// RustConnection. Thus, readability of the underlying socket is not enough to guarantee we do not +/// miss wakeups. +/// +/// This thread will call wait_for_event(). RustConnection then ensures internally to wake us up +/// when an event arrives. So far, this seems to be the only safe way to integrate x11rb with +/// calloop. +fn run_event_thread(connection: Arc, sender: SyncSender, log: slog::Logger) { + loop { + let event = match connection.wait_for_event() { + Ok(event) => event, + Err(err) => { + // Connection errors are most likely permanent. Thus, exit the thread. + slog::crit!(log, "Event thread exiting due to connection error {}", err); + break; + } + }; + match sender.send(event) { + Ok(()) => {}, + Err(_) => { + // The only possible error is that the other end of the channel was dropped. + // This code should be unreachable, because X11Source owns the channel and waits + // for this thread to exit in its Drop implementation. + slog::info!(log, "Event thread exiting due to send error"); + break; + } + } } } From 05e5036584412db7654dfbcacfda1a6f7eaa94af Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Sat, 14 Aug 2021 17:37:26 +0200 Subject: [PATCH 2/3] Exit the worker thread when X11Source is dropped The previous commit added a new worker thread. This thread might sit in wait_for_event() indefinitely even after the X11Source was dropped. This is because nothing guarantees that an X11 event will come in "soonish". And only then would the thread notice that its main thread is gone. This commit cleans that up by having X11Source explicitly wake up the event thread and wait for it to exit in its Drop implementation. Signed-off-by: Uli Schlachter --- anvil/src/xwayland/mod.rs | 3 +- anvil/src/xwayland/x11rb_event_source.rs | 47 ++++++++++++++++++++---- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs index 7df9651..68e14f3 100644 --- a/anvil/src/xwayland/mod.rs +++ b/anvil/src/xwayland/mod.rs @@ -60,6 +60,7 @@ x11rb::atom_manager! { Atoms: AtomsCookie { WM_S0, WL_SURFACE_ID, + _ANVIL_CLOSE_CONNECTION, } } @@ -124,7 +125,7 @@ impl X11State { log: log.clone(), }; - Ok((wm, X11Source::new(conn, log))) + Ok((wm, X11Source::new(conn, win, atoms._ANVIL_CLOSE_CONNECTION, log))) } fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { diff --git a/anvil/src/xwayland/x11rb_event_source.rs b/anvil/src/xwayland/x11rb_event_source.rs index f401640..db1b07b 100644 --- a/anvil/src/xwayland/x11rb_event_source.rs +++ b/anvil/src/xwayland/x11rb_event_source.rs @@ -1,6 +1,6 @@ use std::{ io::Result as IOResult, - sync::{atomic::{AtomicBool, Ordering}, Arc}, + sync::Arc, thread::{spawn, JoinHandle}, }; @@ -24,18 +24,53 @@ use smithay::reexports::calloop::{ /// /// [1]: https://docs.rs/x11rb/0.8.1/x11rb/event_loop_integration/index.html#threads-and-races pub struct X11Source { + connection: Arc, channel: Channel, + event_thread: Option>, + close_window: Window, + close_type: Atom, + log: slog::Logger, } impl X11Source { /// Create a new X11 source. - pub fn new(connection: Arc, log: slog::Logger) -> Self { + /// + /// The returned instance will use `SendRequest` to cause a `ClientMessageEvent` to be sent to + /// the given window with the given type. The expectation is that this is a window that was + /// created by us. Thus, the event reading thread will wake up and check an internal exit flag, + /// then exit. + pub fn new(connection: Arc, close_window: Window, close_type: Atom, log: slog::Logger) -> Self { let (sender, channel) = sync_channel(5); let conn = Arc::clone(&connection); + let log2 = log.clone(); let event_thread = Some(spawn(move || { - run_event_thread(connection, sender, log); + run_event_thread(conn, sender, log2); })); - Self { channel } + Self { connection, channel, event_thread, close_window, close_type, log } + } +} + +impl Drop for X11Source { + fn drop(&mut self) { + // Signal the worker thread to exit by dropping the read end of the channel. + // There is no easy and nice way to do this, so do it the ugly way: Replace it. + let (_, channel) = sync_channel(1); + self.channel = channel; + + // Send an event to wake up the worker so that it actually exits + let event = ClientMessageEvent { + response_type: CLIENT_MESSAGE_EVENT, + format: 8, + sequence: 0, + window: self.close_window, + type_: self.close_type, + data: [0; 20].into(), + }; + let _ = self.connection.send_event(false, self.close_window, EventMask::NO_EVENT, event); + let _ = self.connection.flush(); + + // Wait for the worker thread to exit + self.event_thread.take().map(|handle| handle.join()); } } @@ -99,9 +134,7 @@ fn run_event_thread(connection: Arc, sender: SyncSender, Ok(()) => {}, Err(_) => { // The only possible error is that the other end of the channel was dropped. - // This code should be unreachable, because X11Source owns the channel and waits - // for this thread to exit in its Drop implementation. - slog::info!(log, "Event thread exiting due to send error"); + // This happens in X11Source's Drop impl. break; } } From 9af64eb0b39bf34105891f08054dd8442b3473f1 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Sat, 14 Aug 2021 17:53:32 +0200 Subject: [PATCH 3/3] rustfmt Signed-off-by: Uli Schlachter --- anvil/src/xwayland/mod.rs | 6 ++-- anvil/src/xwayland/x11rb_event_source.rs | 38 +++++++++++++++++------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs index 68e14f3..f876dba 100644 --- a/anvil/src/xwayland/mod.rs +++ b/anvil/src/xwayland/mod.rs @@ -1,4 +1,6 @@ -use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc}; +use std::{ + cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc, +}; use smithay::{ reexports::wayland_server::{protocol::wl_surface::WlSurface, Client}, @@ -44,7 +46,7 @@ impl AnvilState { self.handle .insert_source(source, move |event, _, _| { match wm.borrow_mut().handle_event(event, &client) { - Ok(()) => {}, + Ok(()) => {} Err(err) => error!(log, "Error while handling X11 event: {}", err), } }) diff --git a/anvil/src/xwayland/x11rb_event_source.rs b/anvil/src/xwayland/x11rb_event_source.rs index db1b07b..694b434 100644 --- a/anvil/src/xwayland/x11rb_event_source.rs +++ b/anvil/src/xwayland/x11rb_event_source.rs @@ -5,7 +5,12 @@ use std::{ }; use x11rb::{ - connection::Connection as _, protocol::{Event, xproto::{CLIENT_MESSAGE_EVENT, Atom, ConnectionExt as _, ClientMessageEvent, EventMask, Window}}, rust_connection::RustConnection, + connection::Connection as _, + protocol::{ + xproto::{Atom, ClientMessageEvent, ConnectionExt as _, EventMask, Window, CLIENT_MESSAGE_EVENT}, + Event, + }, + rust_connection::RustConnection, }; use smithay::reexports::calloop::{ @@ -39,14 +44,26 @@ impl X11Source { /// the given window with the given type. The expectation is that this is a window that was /// created by us. Thus, the event reading thread will wake up and check an internal exit flag, /// then exit. - pub fn new(connection: Arc, close_window: Window, close_type: Atom, log: slog::Logger) -> Self { + pub fn new( + connection: Arc, + close_window: Window, + close_type: Atom, + log: slog::Logger, + ) -> Self { let (sender, channel) = sync_channel(5); let conn = Arc::clone(&connection); let log2 = log.clone(); let event_thread = Some(spawn(move || { run_event_thread(conn, sender, log2); })); - Self { connection, channel, event_thread, close_window, close_type, log } + Self { + connection, + channel, + event_thread, + close_window, + close_type, + log, + } } } @@ -66,7 +83,9 @@ impl Drop for X11Source { type_: self.close_type, data: [0; 20].into(), }; - let _ = self.connection.send_event(false, self.close_window, EventMask::NO_EVENT, event); + let _ = self + .connection + .send_event(false, self.close_window, EventMask::NO_EVENT, event); let _ = self.connection.flush(); // Wait for the worker thread to exit @@ -89,12 +108,11 @@ impl EventSource for X11Source { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let log = self.log.clone(); - self.channel.process_events(readiness, token, move |event, meta| { - match event { + self.channel + .process_events(readiness, token, move |event, meta| match event { ChannelEvent::Closed => slog::warn!(log, "Event thread exited"), - ChannelEvent::Msg(event) => callback(event, meta) - } - }) + ChannelEvent::Msg(event) => callback(event, meta), + }) } fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { @@ -131,7 +149,7 @@ fn run_event_thread(connection: Arc, sender: SyncSender, } }; match sender.send(event) { - Ok(()) => {}, + Ok(()) => {} Err(_) => { // The only possible error is that the other end of the channel was dropped. // This happens in X11Source's Drop impl.