Anvil: Move X11 event reading to a thread

x11rb caches X11 events internally. This cache is used when waiting for
the reply of a request and an event is received first.  This cache is
however also used when sending a request, because the X11 client may
never block and stop reading while writing a request. Doing so could
cause a deadlock with the X11 server.

Thus, this commit changes X11Source to spawn a thread internally. This
thread calls wait_for_event() in a loop and uses a calloop channel to
forward these events to the main loop. x11rb's RustConnection internally
uses a ConditionVariable to make sure this thread will wake up when
needed.

Signed-off-by: Uli Schlachter <psychon@znc.in>
This commit is contained in:
Uli Schlachter 2021-08-14 17:25:51 +02:00
parent dbd03567ff
commit 6004ecdc14
2 changed files with 80 additions and 45 deletions

View File

@ -1,4 +1,4 @@
use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc}; use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc};
use smithay::{ use smithay::{
reexports::wayland_server::{protocol::wl_surface::WlSurface, Client}, reexports::wayland_server::{protocol::wl_surface::WlSurface, Client},
@ -40,13 +40,13 @@ impl<BackendData: 'static> AnvilState<BackendData> {
let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap(); let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap();
let wm = Rc::new(RefCell::new(wm)); let wm = Rc::new(RefCell::new(wm));
client.data_map().insert_if_missing(|| Rc::clone(&wm)); client.data_map().insert_if_missing(|| Rc::clone(&wm));
let log = self.log.clone();
self.handle self.handle
.insert_source(source, move |events, _, _| { .insert_source(source, move |event, _, _| {
let mut wm = wm.borrow_mut(); match wm.borrow_mut().handle_event(event, &client) {
for event in events.into_iter() { Ok(()) => {},
wm.handle_event(event, &client)?; Err(err) => error!(log, "Error while handling X11 event: {}", err),
} }
Ok(())
}) })
.unwrap(); .unwrap();
} }
@ -65,7 +65,7 @@ x11rb::atom_manager! {
/// The actual runtime state of the XWayland integration. /// The actual runtime state of the XWayland integration.
struct X11State { struct X11State {
conn: Rc<RustConnection>, conn: Arc<RustConnection>,
atoms: Atoms, atoms: Atoms,
log: slog::Logger, log: slog::Logger,
unpaired_surfaces: HashMap<u32, (Window, Point<i32, Logical>)>, unpaired_surfaces: HashMap<u32, (Window, Point<i32, Logical>)>,
@ -115,16 +115,16 @@ impl X11State {
conn.flush()?; conn.flush()?;
let conn = Rc::new(conn); let conn = Arc::new(conn);
let wm = Self { let wm = Self {
conn: Rc::clone(&conn), conn: Arc::clone(&conn),
atoms, atoms,
unpaired_surfaces: Default::default(), unpaired_surfaces: Default::default(),
window_map, window_map,
log, log: log.clone(),
}; };
Ok((wm, X11Source::new(conn))) Ok((wm, X11Source::new(conn, log)))
} }
fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> {
@ -199,6 +199,7 @@ impl X11State {
} }
_ => {} _ => {}
} }
self.conn.flush()?;
Ok(()) Ok(())
} }

View File

@ -1,35 +1,48 @@
use std::{ use std::{
io::{Error as IOError, ErrorKind, Result as IOResult}, io::Result as IOResult,
os::unix::io::AsRawFd, sync::{atomic::{AtomicBool, Ordering}, Arc},
rc::Rc, thread::{spawn, JoinHandle},
}; };
use x11rb::{ use x11rb::{
connection::Connection as _, errors::ReplyOrIdError, protocol::Event, rust_connection::RustConnection, connection::Connection as _, protocol::{Event, xproto::{CLIENT_MESSAGE_EVENT, Atom, ConnectionExt as _, ClientMessageEvent, EventMask, Window}}, rust_connection::RustConnection,
}; };
use smithay::reexports::calloop::{ use smithay::reexports::calloop::{
generic::{Fd, Generic}, channel::{sync_channel, Channel, Event as ChannelEvent, SyncSender},
EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, EventSource, Poll, PostAction, Readiness, Token, TokenFactory,
}; };
/// Integration of an x11rb X11 connection with calloop.
///
/// This is a thin wrapper around `Channel`. It works by spawning an extra thread reads events from
/// the X11 connection and then sends them across the channel.
///
/// See [1] for why this extra thread is necessary. The single-thread solution proposed on that
/// page does not work with calloop, since it requires checking something on every main loop
/// iteration. Calloop only allows "when an FD becomes readable".
///
/// [1]: https://docs.rs/x11rb/0.8.1/x11rb/event_loop_integration/index.html#threads-and-races
pub struct X11Source { pub struct X11Source {
connection: Rc<RustConnection>, channel: Channel<Event>,
generic: Generic<Fd>,
} }
impl X11Source { impl X11Source {
pub fn new(connection: Rc<RustConnection>) -> Self { /// Create a new X11 source.
let fd = Fd(connection.stream().as_raw_fd()); pub fn new(connection: Arc<RustConnection>, log: slog::Logger) -> Self {
let generic = Generic::new(fd, Interest::READ, Mode::Level); let (sender, channel) = sync_channel(5);
Self { connection, generic } let conn = Arc::clone(&connection);
let event_thread = Some(spawn(move || {
run_event_thread(connection, sender, log);
}));
Self { channel }
} }
} }
impl EventSource for X11Source { impl EventSource for X11Source {
type Event = Vec<Event>; type Event = Event;
type Metadata = (); type Metadata = ();
type Ret = Result<(), ReplyOrIdError>; type Ret = ();
fn process_events<C>( fn process_events<C>(
&mut self, &mut self,
@ -40,36 +53,57 @@ impl EventSource for X11Source {
where where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{ {
fn inner<C>(conn: &RustConnection, mut callback: C) -> Result<(), ReplyOrIdError> let log = self.log.clone();
where self.channel.process_events(readiness, token, move |event, meta| {
C: FnMut(Vec<Event>, &mut ()) -> Result<(), ReplyOrIdError>, match event {
{ ChannelEvent::Closed => slog::warn!(log, "Event thread exited"),
let mut events = Vec::new(); ChannelEvent::Msg(event) => callback(event, meta)
while let Some(event) = conn.poll_for_event()? {
events.push(event);
} }
if !events.is_empty() {
callback(events, &mut ())?;
}
conn.flush()?;
Ok(())
}
let connection = &self.connection;
self.generic.process_events(readiness, token, |_, _| {
inner(connection, &mut callback).map_err(|err| IOError::new(ErrorKind::Other, err))?;
Ok(PostAction::Continue)
}) })
} }
fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> {
self.generic.register(poll, factory) self.channel.register(poll, factory)
} }
fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> {
self.generic.reregister(poll, factory) self.channel.reregister(poll, factory)
} }
fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> { fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> {
self.generic.unregister(poll) self.channel.unregister(poll)
}
}
/// This thread reads X11 events from the connection and sends them on the channel.
///
/// This is run in an extra thread since sending an X11 request or waiting for the reply to an X11
/// request can both read X11 events from the underlying socket which are then saved in the
/// RustConnection. Thus, readability of the underlying socket is not enough to guarantee we do not
/// miss wakeups.
///
/// This thread will call wait_for_event(). RustConnection then ensures internally to wake us up
/// when an event arrives. So far, this seems to be the only safe way to integrate x11rb with
/// calloop.
fn run_event_thread(connection: Arc<RustConnection>, sender: SyncSender<Event>, log: slog::Logger) {
loop {
let event = match connection.wait_for_event() {
Ok(event) => event,
Err(err) => {
// Connection errors are most likely permanent. Thus, exit the thread.
slog::crit!(log, "Event thread exiting due to connection error {}", err);
break;
}
};
match sender.send(event) {
Ok(()) => {},
Err(_) => {
// The only possible error is that the other end of the channel was dropped.
// This code should be unreachable, because X11Source owns the channel and waits
// for this thread to exit in its Drop implementation.
slog::info!(log, "Event thread exiting due to send error");
break;
}
}
} }
} }