Merge pull request #366 from psychon/x11-event-thread

Anvil: Use a dedicated X11 event thread for event reading
This commit is contained in:
Victor Berger 2021-08-24 11:31:02 +02:00 committed by GitHub
commit f4ba6f5bea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 136 additions and 47 deletions

View File

@ -1,4 +1,6 @@
use std::{cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc}; use std::{
cell::RefCell, collections::HashMap, convert::TryFrom, os::unix::net::UnixStream, rc::Rc, sync::Arc,
};
use smithay::{ use smithay::{
reexports::wayland_server::{protocol::wl_surface::WlSurface, Client}, reexports::wayland_server::{protocol::wl_surface::WlSurface, Client},
@ -40,13 +42,13 @@ impl<BackendData: 'static> AnvilState<BackendData> {
let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap(); let (wm, source) = X11State::start_wm(connection, self.window_map.clone(), self.log.clone()).unwrap();
let wm = Rc::new(RefCell::new(wm)); let wm = Rc::new(RefCell::new(wm));
client.data_map().insert_if_missing(|| Rc::clone(&wm)); client.data_map().insert_if_missing(|| Rc::clone(&wm));
let log = self.log.clone();
self.handle self.handle
.insert_source(source, move |events, _, _| { .insert_source(source, move |event, _, _| {
let mut wm = wm.borrow_mut(); match wm.borrow_mut().handle_event(event, &client) {
for event in events.into_iter() { Ok(()) => {}
wm.handle_event(event, &client)?; Err(err) => error!(log, "Error while handling X11 event: {}", err),
} }
Ok(())
}) })
.unwrap(); .unwrap();
} }
@ -60,12 +62,13 @@ x11rb::atom_manager! {
Atoms: AtomsCookie { Atoms: AtomsCookie {
WM_S0, WM_S0,
WL_SURFACE_ID, WL_SURFACE_ID,
_ANVIL_CLOSE_CONNECTION,
} }
} }
/// The actual runtime state of the XWayland integration. /// The actual runtime state of the XWayland integration.
struct X11State { struct X11State {
conn: Rc<RustConnection>, conn: Arc<RustConnection>,
atoms: Atoms, atoms: Atoms,
log: slog::Logger, log: slog::Logger,
unpaired_surfaces: HashMap<u32, (Window, Point<i32, Logical>)>, unpaired_surfaces: HashMap<u32, (Window, Point<i32, Logical>)>,
@ -115,16 +118,16 @@ impl X11State {
conn.flush()?; conn.flush()?;
let conn = Rc::new(conn); let conn = Arc::new(conn);
let wm = Self { let wm = Self {
conn: Rc::clone(&conn), conn: Arc::clone(&conn),
atoms, atoms,
unpaired_surfaces: Default::default(), unpaired_surfaces: Default::default(),
window_map, window_map,
log, log: log.clone(),
}; };
Ok((wm, X11Source::new(conn))) Ok((wm, X11Source::new(conn, win, atoms._ANVIL_CLOSE_CONNECTION, log)))
} }
fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> {
@ -199,6 +202,7 @@ impl X11State {
} }
_ => {} _ => {}
} }
self.conn.flush()?;
Ok(()) Ok(())
} }

View File

@ -1,35 +1,102 @@
use std::{ use std::{
io::{Error as IOError, ErrorKind, Result as IOResult}, io::Result as IOResult,
os::unix::io::AsRawFd, sync::Arc,
rc::Rc, thread::{spawn, JoinHandle},
}; };
use x11rb::{ use x11rb::{
connection::Connection as _, errors::ReplyOrIdError, protocol::Event, rust_connection::RustConnection, connection::Connection as _,
protocol::{
xproto::{Atom, ClientMessageEvent, ConnectionExt as _, EventMask, Window, CLIENT_MESSAGE_EVENT},
Event,
},
rust_connection::RustConnection,
}; };
use smithay::reexports::calloop::{ use smithay::reexports::calloop::{
generic::{Fd, Generic}, channel::{sync_channel, Channel, Event as ChannelEvent, SyncSender},
EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, EventSource, Poll, PostAction, Readiness, Token, TokenFactory,
}; };
/// Integration of an x11rb X11 connection with calloop.
///
/// This is a thin wrapper around `Channel`. It works by spawning an extra thread reads events from
/// the X11 connection and then sends them across the channel.
///
/// See [1] for why this extra thread is necessary. The single-thread solution proposed on that
/// page does not work with calloop, since it requires checking something on every main loop
/// iteration. Calloop only allows "when an FD becomes readable".
///
/// [1]: https://docs.rs/x11rb/0.8.1/x11rb/event_loop_integration/index.html#threads-and-races
pub struct X11Source { pub struct X11Source {
connection: Rc<RustConnection>, connection: Arc<RustConnection>,
generic: Generic<Fd>, channel: Channel<Event>,
event_thread: Option<JoinHandle<()>>,
close_window: Window,
close_type: Atom,
log: slog::Logger,
} }
impl X11Source { impl X11Source {
pub fn new(connection: Rc<RustConnection>) -> Self { /// Create a new X11 source.
let fd = Fd(connection.stream().as_raw_fd()); ///
let generic = Generic::new(fd, Interest::READ, Mode::Level); /// The returned instance will use `SendRequest` to cause a `ClientMessageEvent` to be sent to
Self { connection, generic } /// the given window with the given type. The expectation is that this is a window that was
/// created by us. Thus, the event reading thread will wake up and check an internal exit flag,
/// then exit.
pub fn new(
connection: Arc<RustConnection>,
close_window: Window,
close_type: Atom,
log: slog::Logger,
) -> Self {
let (sender, channel) = sync_channel(5);
let conn = Arc::clone(&connection);
let log2 = log.clone();
let event_thread = Some(spawn(move || {
run_event_thread(conn, sender, log2);
}));
Self {
connection,
channel,
event_thread,
close_window,
close_type,
log,
}
}
}
impl Drop for X11Source {
fn drop(&mut self) {
// Signal the worker thread to exit by dropping the read end of the channel.
// There is no easy and nice way to do this, so do it the ugly way: Replace it.
let (_, channel) = sync_channel(1);
self.channel = channel;
// Send an event to wake up the worker so that it actually exits
let event = ClientMessageEvent {
response_type: CLIENT_MESSAGE_EVENT,
format: 8,
sequence: 0,
window: self.close_window,
type_: self.close_type,
data: [0; 20].into(),
};
let _ = self
.connection
.send_event(false, self.close_window, EventMask::NO_EVENT, event);
let _ = self.connection.flush();
// Wait for the worker thread to exit
self.event_thread.take().map(|handle| handle.join());
} }
} }
impl EventSource for X11Source { impl EventSource for X11Source {
type Event = Vec<Event>; type Event = Event;
type Metadata = (); type Metadata = ();
type Ret = Result<(), ReplyOrIdError>; type Ret = ();
fn process_events<C>( fn process_events<C>(
&mut self, &mut self,
@ -40,36 +107,54 @@ impl EventSource for X11Source {
where where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{ {
fn inner<C>(conn: &RustConnection, mut callback: C) -> Result<(), ReplyOrIdError> let log = self.log.clone();
where self.channel
C: FnMut(Vec<Event>, &mut ()) -> Result<(), ReplyOrIdError>, .process_events(readiness, token, move |event, meta| match event {
{ ChannelEvent::Closed => slog::warn!(log, "Event thread exited"),
let mut events = Vec::new(); ChannelEvent::Msg(event) => callback(event, meta),
while let Some(event) = conn.poll_for_event()? { })
events.push(event);
}
if !events.is_empty() {
callback(events, &mut ())?;
}
conn.flush()?;
Ok(())
}
let connection = &self.connection;
self.generic.process_events(readiness, token, |_, _| {
inner(connection, &mut callback).map_err(|err| IOError::new(ErrorKind::Other, err))?;
Ok(PostAction::Continue)
})
} }
fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { fn register(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> {
self.generic.register(poll, factory) self.channel.register(poll, factory)
} }
fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> { fn reregister(&mut self, poll: &mut Poll, factory: &mut TokenFactory) -> IOResult<()> {
self.generic.reregister(poll, factory) self.channel.reregister(poll, factory)
} }
fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> { fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> {
self.generic.unregister(poll) self.channel.unregister(poll)
}
}
/// This thread reads X11 events from the connection and sends them on the channel.
///
/// This is run in an extra thread since sending an X11 request or waiting for the reply to an X11
/// request can both read X11 events from the underlying socket which are then saved in the
/// RustConnection. Thus, readability of the underlying socket is not enough to guarantee we do not
/// miss wakeups.
///
/// This thread will call wait_for_event(). RustConnection then ensures internally to wake us up
/// when an event arrives. So far, this seems to be the only safe way to integrate x11rb with
/// calloop.
fn run_event_thread(connection: Arc<RustConnection>, sender: SyncSender<Event>, log: slog::Logger) {
loop {
let event = match connection.wait_for_event() {
Ok(event) => event,
Err(err) => {
// Connection errors are most likely permanent. Thus, exit the thread.
slog::crit!(log, "Event thread exiting due to connection error {}", err);
break;
}
};
match sender.send(event) {
Ok(()) => {}
Err(_) => {
// The only possible error is that the other end of the channel was dropped.
// This happens in X11Source's Drop impl.
break;
}
}
} }
} }