backend.session: rework as calloop event sources

Rework the Session Notifiers so that they are calloop event sources
by themselves, allowing them to be inserted by the user without the
`bind_session` dance.

Also update the logind backend to use the current dbus-rs API, rather
than the deprecated one.
This commit is contained in:
Victor Berger 2020-05-19 23:21:19 +02:00 committed by Victor Berger
parent 2d5e829e12
commit b05c2ccbba
5 changed files with 363 additions and 305 deletions

View File

@ -29,8 +29,8 @@ use smithay::{
graphics::{CursorBackend, SwapBuffersError},
libinput::{LibinputInputBackend, LibinputSessionInterface},
session::{
auto::{auto_session_bind, AutoSession},
notify_multiplexer, AsSessionObserver, Session, SessionNotifier, SessionObserver,
auto::AutoSession, notify_multiplexer, AsSessionObserver, Session, SessionNotifier,
SessionObserver,
},
udev::{primary_gpu, UdevBackend, UdevEvent},
},
@ -209,10 +209,10 @@ pub fn run_udev(
anvil_state.process_input_event(event)
})
.unwrap();
let session_event_source = auto_session_bind(notifier, event_loop.handle())
.map_err(|(e, _)| e)
let session_event_source = event_loop
.handle()
.insert_source(notifier, |(), &mut (), _anvil_state| {})
.unwrap();
for (dev, path) in udev_backend.device_list() {
udev_handler.device_added(dev, path.into())
}
@ -246,7 +246,7 @@ pub fn run_udev(
// Cleanup stuff
state.window_map.borrow_mut().clear();
let mut notifier = session_event_source.unbind();
let mut notifier = event_loop.handle().remove(session_event_source);
notifier.unregister(libinput_session_id);
notifier.unregister(udev_session_id);

View File

@ -27,17 +27,20 @@
//!
//! It is crucial to avoid errors during that state. Examples for object that might be registered
//! for notifications are the [`Libinput`](input::Libinput) context or the [`Device`](::backend::drm::Device).
//!
//! The [`AutoSessionNotifier`](::backend::session::auto::AutoSessionNotifier) is to be inserted into
//! a calloop event source to have its events processed.
#[cfg(feature = "backend_session_logind")]
use super::logind::{self, logind_session_bind, BoundLogindSession, LogindSession, LogindSessionNotifier};
use super::logind::{self, LogindSession, LogindSessionNotifier};
use super::{
direct::{self, direct_session_bind, BoundDirectSession, DirectSession, DirectSessionNotifier},
direct::{self, DirectSession, DirectSessionNotifier},
AsErrno, Session, SessionNotifier, SessionObserver,
};
use nix::fcntl::OFlag;
use std::{cell::RefCell, io::Error as IoError, os::unix::io::RawFd, path::Path, rc::Rc};
use std::{cell::RefCell, io, os::unix::io::RawFd, path::Path, rc::Rc};
use calloop::LoopHandle;
use calloop::{EventSource, Poll, Readiness, Token};
/// [`Session`] using the best available interface
#[derive(Clone)]
@ -58,19 +61,6 @@ pub enum AutoSessionNotifier {
Direct(DirectSessionNotifier),
}
/// Bound session that is driven by a [`EventLoop`](calloop::EventLoop).
///
/// See [`auto_session_bind`] for details.
///
/// Dropping this object will close the session just like the [`AutoSessionNotifier`].
pub enum BoundAutoSession {
/// Bound logind session
#[cfg(feature = "backend_session_logind")]
Logind(BoundLogindSession),
/// Bound direct / tty session
Direct(BoundDirectSession),
}
/// Id's used by the [`AutoSessionNotifier`] internally.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct AutoId(AutoIdInternal);
@ -139,26 +129,6 @@ impl AutoSession {
}
}
/// Bind an [`AutoSessionNotifier`] to an [`EventLoop`](calloop::EventLoop).
///
/// Allows the [`AutoSessionNotifier`] to listen for incoming signals signalling the session state.
/// If you don't use this function [`AutoSessionNotifier`] will not correctly tell you the
/// session state and call its [`SessionObserver`]s.
pub fn auto_session_bind<Data: 'static>(
notifier: AutoSessionNotifier,
handle: LoopHandle<Data>,
) -> ::std::result::Result<BoundAutoSession, (IoError, AutoSessionNotifier)> {
Ok(match notifier {
#[cfg(feature = "backend_session_logind")]
AutoSessionNotifier::Logind(logind) => BoundAutoSession::Logind(
logind_session_bind(logind, handle).map_err(|(e, n)| (e, AutoSessionNotifier::Logind(n)))?,
),
AutoSessionNotifier::Direct(direct) => BoundAutoSession::Direct(
direct_session_bind(direct, handle).map_err(|(e, n)| (e, AutoSessionNotifier::Direct(n)))?,
),
})
}
impl Session for AutoSession {
type Error = Error;
@ -208,10 +178,10 @@ impl SessionNotifier for AutoSessionNotifier {
match *self {
#[cfg(feature = "backend_session_logind")]
AutoSessionNotifier::Logind(ref mut logind) => {
AutoId(AutoIdInternal::Logind(logind.register(signal)))
AutoId(AutoIdInternal::Logind(SessionNotifier::register(logind, signal)))
}
AutoSessionNotifier::Direct(ref mut direct) => {
AutoId(AutoIdInternal::Direct(direct.register(signal)))
AutoId(AutoIdInternal::Direct(SessionNotifier::register(direct, signal)))
}
}
}
@ -221,10 +191,10 @@ impl SessionNotifier for AutoSessionNotifier {
match (self, signal) {
#[cfg(feature = "backend_session_logind")]
(&mut AutoSessionNotifier::Logind(ref mut logind), AutoId(AutoIdInternal::Logind(signal))) => {
logind.unregister(signal)
SessionNotifier::unregister(logind, signal)
}
(&mut AutoSessionNotifier::Direct(ref mut direct), AutoId(AutoIdInternal::Direct(signal))) => {
direct.unregister(signal)
SessionNotifier::unregister(direct, signal)
}
// this pattern is needed when the logind backend is activated
_ => unreachable!(),
@ -232,13 +202,43 @@ impl SessionNotifier for AutoSessionNotifier {
}
}
impl BoundAutoSession {
/// Unbind the session from the [`EventLoop`](calloop::EventLoop) again
pub fn unbind(self) -> AutoSessionNotifier {
impl EventSource for AutoSessionNotifier {
type Event = ();
type Metadata = ();
type Ret = ();
fn process_events<F>(&mut self, readiness: Readiness, token: Token, callback: F) -> io::Result<()>
where
F: FnMut((), &mut ()),
{
match self {
#[cfg(feature = "backend_session_logind")]
BoundAutoSession::Logind(logind) => AutoSessionNotifier::Logind(logind.unbind()),
BoundAutoSession::Direct(direct) => AutoSessionNotifier::Direct(direct.unbind()),
AutoSessionNotifier::Logind(s) => s.process_events(readiness, token, callback),
AutoSessionNotifier::Direct(s) => s.process_events(readiness, token, callback),
}
}
fn register(&mut self, poll: &mut Poll, token: Token) -> io::Result<()> {
match self {
#[cfg(feature = "backend_session_logind")]
AutoSessionNotifier::Logind(s) => EventSource::register(s, poll, token),
AutoSessionNotifier::Direct(s) => EventSource::register(s, poll, token),
}
}
fn reregister(&mut self, poll: &mut Poll, token: Token) -> io::Result<()> {
match self {
#[cfg(feature = "backend_session_logind")]
AutoSessionNotifier::Logind(s) => EventSource::reregister(s, poll, token),
AutoSessionNotifier::Direct(s) => EventSource::reregister(s, poll, token),
}
}
fn unregister(&mut self, poll: &mut Poll) -> io::Result<()> {
match self {
#[cfg(feature = "backend_session_logind")]
AutoSessionNotifier::Logind(s) => EventSource::unregister(s, poll),
AutoSessionNotifier::Direct(s) => EventSource::unregister(s, poll),
}
}
}

View File

@ -30,11 +30,13 @@
//!
//! It is crucial to avoid errors during that state. Examples for object that might be registered
//! for notifications are the [`Libinput`](input::Libinput) context or the [`Device`](::backend::drm::Device).
//!
//! The [`LogindSessionNotifier`](::backend::session::dbus::logind::LogindSessionNotifier) is to be inserted into
//! a calloop event source to have its events processed.
use crate::backend::session::{AsErrno, Session, SessionNotifier, SessionObserver};
use dbus::{
arg::{messageitem::MessageItem, OwnedFd},
ffidisp::{BusType, Connection, ConnectionItem, Watch, WatchEvent},
strings::{BusName, Interface, Member, Path as DbusPath},
Message,
};
@ -52,14 +54,13 @@ use std::{
};
use systemd::login;
use calloop::{
generic::{Fd, Generic},
InsertError, Interest, LoopHandle, Readiness, Source,
};
use calloop::{EventSource, Poll, Readiness, Token};
use super::DBusConnection;
struct LogindSessionImpl {
session_id: String,
conn: RefCell<Connection>,
conn: RefCell<DBusConnection>,
session_path: DbusPath<'static>,
active: AtomicBool,
signals: RefCell<Vec<Option<Box<dyn SessionObserver>>>>,
@ -95,7 +96,7 @@ impl LogindSession {
let vt = login::get_vt(session_id.clone()).ok();
// Create dbus connection
let conn = Connection::get_private(BusType::System).map_err(Error::FailedDbusConnection)?;
let conn = DBusConnection::new_system().map_err(Error::FailedDbusConnection)?;
// and get the session path
let session_path = LogindSessionImpl::blocking_call(
&conn,
@ -202,7 +203,7 @@ impl LogindSessionNotifier {
impl LogindSessionImpl {
fn blocking_call<'d, 'p, 'i, 'm, D, P, I, M>(
conn: &Connection,
conn: &DBusConnection,
destination: D,
path: P,
interface: I,
@ -226,8 +227,9 @@ impl LogindSessionImpl {
message.append_items(&arguments)
};
let mut message =
conn.send_with_reply_and_block(message, 1000)
let mut message = conn
.channel()
.send_with_reply_and_block(message, std::time::Duration::from_millis(1000))
.map_err(|source| Error::FailedToSendDbusCall {
bus: destination.clone(),
path: path.clone(),
@ -248,16 +250,7 @@ impl LogindSessionImpl {
}
}
fn handle_signals<I>(&self, signals: I) -> Result<(), Error>
where
I: IntoIterator<Item = ConnectionItem>,
{
for item in signals {
let message = if let ConnectionItem::Signal(ref s) = item {
s
} else {
continue;
};
fn handle_message(&self, message: dbus::Message) -> Result<(), Error> {
if &*message.interface().unwrap() == "org.freedesktop.login1.Manager"
&& &*message.member().unwrap() == "SessionRemoved"
&& message.get1::<String>().unwrap() == self.session_id
@ -334,14 +327,22 @@ impl LogindSessionImpl {
{
use dbus::arg::{Array, Dict, Get, Iter, Variant};
let (_, changed, _) =
message.get3::<String, Dict<'_, String, Variant<Iter<'_>>, Iter<'_>>, Array<'_, String, Iter<'_>>>();
let (_, changed, _) = message
.get3::<String, Dict<'_, String, Variant<Iter<'_>>, Iter<'_>>, Array<'_, String, Iter<'_>>>();
let mut changed = changed.ok_or(Error::UnexpectedMethodReturn)?;
if let Some((_, mut value)) = changed.find(|&(ref key, _)| &*key == "Active") {
if let Some(active) = Get::get(&mut value.0) {
self.active.store(active, Ordering::SeqCst);
}
}
} else {
// Handle default replies if necessary
if let Some(reply) = dbus::channel::default_reply(&message) {
self.conn
.borrow()
.channel()
.send(reply)
.map_err(|()| Error::SessionLost)?;
}
}
Ok(())
@ -439,78 +440,6 @@ impl SessionNotifier for LogindSessionNotifier {
}
}
/// Bound logind session that is driven by the [`EventLoop`](calloop::EventLoop).
///
/// See [`logind_session_bind`] for details.
///
/// Dropping this object will close the logind session just like the [`LogindSessionNotifier`].
pub struct BoundLogindSession {
notifier: LogindSessionNotifier,
_watches: Vec<Watch>,
sources: Vec<Source<Generic<Fd>>>,
kill_source: Box<dyn Fn(Source<Generic<Fd>>)>,
}
/// Bind a [`LogindSessionNotifier`] to an [`EventLoop`](calloop::EventLoop).
///
/// Allows the [`LogindSessionNotifier`] to listen for incoming signals signalling the session state.
/// If you don't use this function [`LogindSessionNotifier`] will not correctly tell you the logind
/// session state and call it's [`SessionObserver`]s.
pub fn logind_session_bind<Data: 'static>(
notifier: LogindSessionNotifier,
handle: LoopHandle<Data>,
) -> ::std::result::Result<BoundLogindSession, (IoError, LogindSessionNotifier)> {
let watches = notifier.internal.conn.borrow().watch_fds();
let internal_for_error = notifier.internal.clone();
let sources = watches
.clone()
.into_iter()
.filter_map(|watch| {
let interest = match (watch.writable(), watch.readable()) {
(true, true) => Interest::Both,
(true, false) => Interest::Writable,
(false, true) => Interest::Readable,
(false, false) => return None,
};
let source = Generic::from_fd(watch.fd(), interest, calloop::Mode::Level);
let source = handle.insert_source(source, {
let mut notifier = notifier.clone();
move |readiness, fd, _| {
notifier.event(readiness, fd.0);
Ok(())
}
});
Some(source)
})
.collect::<::std::result::Result<Vec<Source<Generic<Fd>>>, InsertError<Generic<Fd>>>>()
.map_err(|err| {
(
err.into(),
LogindSessionNotifier {
internal: internal_for_error,
},
)
})?;
Ok(BoundLogindSession {
notifier,
_watches: watches,
sources,
kill_source: Box::new(move |source| handle.kill(source)),
})
}
impl BoundLogindSession {
/// Unbind the logind session from the [`EventLoop`](calloop::EventLoop)
pub fn unbind(self) -> LogindSessionNotifier {
for source in self.sources {
(self.kill_source)(source);
}
self.notifier
}
}
impl Drop for LogindSessionNotifier {
fn drop(&mut self) {
info!(self.internal.logger, "Closing logind session");
@ -526,25 +455,43 @@ impl Drop for LogindSessionNotifier {
}
}
impl LogindSessionNotifier {
fn event(&mut self, readiness: Readiness, fd: RawFd) {
let conn = self.internal.conn.borrow();
let items = conn.watch_handle(
fd,
if readiness.readable && readiness.writable {
WatchEvent::Readable as u32 | WatchEvent::Writable as u32
} else if readiness.readable {
WatchEvent::Readable as u32
} else if readiness.writable {
WatchEvent::Writable as u32
} else {
return;
},
);
if let Err(err) = self.internal.handle_signals(items) {
error!(self.internal.logger, "Error handling dbus signals: {}", err);
impl EventSource for LogindSessionNotifier {
type Event = ();
type Metadata = ();
type Ret = ();
fn process_events<F>(&mut self, readiness: Readiness, token: Token, _: F) -> std::io::Result<()>
where
F: FnMut((), &mut ()),
{
// Accumulate the messages, and then process them, as we can't keep the borrow on the `DBusConnection`
// while processing the messages
let mut messages = Vec::new();
self.internal
.conn
.borrow_mut()
.process_events(readiness, token, |msg, _| messages.push(msg))?;
for msg in messages {
if let Err(err) = self.internal.handle_message(msg) {
error!(self.internal.logger, "Error handling dbus messages: {}", err);
}
}
Ok(())
}
fn register(&mut self, poll: &mut Poll, token: Token) -> std::io::Result<()> {
self.internal.conn.borrow_mut().register(poll, token)
}
fn reregister(&mut self, poll: &mut Poll, token: Token) -> std::io::Result<()> {
self.internal.conn.borrow_mut().reregister(poll, token)
}
fn unregister(&mut self, poll: &mut Poll) -> std::io::Result<()> {
self.internal.conn.borrow_mut().unregister(poll)
}
}
/// Errors related to logind sessions

View File

@ -1,2 +1,116 @@
use std::io;
use calloop::{EventSource, Interest, Mode, Poll, Readiness, Token};
use dbus::{
blocking::LocalConnection,
channel::{BusType, Channel, Watch},
Message,
};
#[cfg(feature = "backend_session_logind")]
pub mod logind;
/// An internal wrapper for handling a DBus connection
///
/// It acts as a calloop event source to dispatch the DBus events
pub(crate) struct DBusConnection {
cx: LocalConnection,
current_watch: Watch,
}
impl DBusConnection {
pub fn new_system() -> Result<DBusConnection, dbus::Error> {
let mut chan = Channel::get_private(BusType::System)?;
chan.set_watch_enabled(true);
Ok(DBusConnection {
cx: chan.into(),
current_watch: Watch {
fd: -1,
read: false,
write: false,
},
})
}
pub fn add_match(&self, match_str: &str) -> Result<(), dbus::Error> {
self.cx.add_match_no_cb(match_str)
}
pub fn channel(&self) -> &Channel {
self.cx.channel()
}
}
impl EventSource for DBusConnection {
type Event = Message;
type Metadata = DBusConnection;
type Ret = ();
fn process_events<F>(&mut self, _: Readiness, _: Token, mut callback: F) -> io::Result<()>
where
F: FnMut(Message, &mut DBusConnection) -> (),
{
self.cx
.channel()
.read_write(Some(std::time::Duration::from_millis(0)))
.map_err(|()| io::Error::new(io::ErrorKind::NotConnected, "DBus connection is closed"))?;
while let Some(message) = self.cx.channel().pop_message() {
callback(message, self);
}
self.cx.channel().flush();
Ok(())
}
fn register(&mut self, poll: &mut Poll, token: Token) -> io::Result<()> {
if self.current_watch.read || self.current_watch.write {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"DBus session already registered to calloop",
));
}
// reregister handles all the watch logic
self.reregister(poll, token)
}
fn reregister(&mut self, poll: &mut Poll, token: Token) -> io::Result<()> {
let new_watch = self.cx.channel().watch();
let new_interest = match (new_watch.read, new_watch.write) {
(true, true) => Some(Interest::Both),
(true, false) => Some(Interest::Readable),
(false, true) => Some(Interest::Writable),
(false, false) => None,
};
if new_watch.fd != self.current_watch.fd {
// remove the previous fd
if self.current_watch.read || self.current_watch.write {
poll.unregister(self.current_watch.fd)?;
}
// insert the new one
if let Some(interest) = new_interest {
poll.register(new_watch.fd, interest, Mode::Level, token)?;
}
} else {
// update the registration
if let Some(interest) = new_interest {
poll.reregister(self.current_watch.fd, interest, Mode::Level, token)?;
} else {
poll.unregister(self.current_watch.fd)?;
}
}
self.current_watch = new_watch;
Ok(())
}
fn unregister(&mut self, poll: &mut Poll) -> io::Result<()> {
if self.current_watch.read || self.current_watch.write {
poll.unregister(self.current_watch.fd)?;
}
self.current_watch = Watch {
fd: -1,
read: false,
write: false,
};
Ok(())
}
}

View File

@ -41,12 +41,12 @@
//!
//! It is crucial to avoid errors during that state. Examples for object that might be registered
//! for notifications are the [`Libinput`](input::Libinput) context or the [`Device`](::backend::drm::Device).
//!
//! The [`DirectSessionNotifier`](::backend::session::direct::DirectSessionNotifier) is to be inserted into
//! a calloop event source to have its events processed.
use super::{AsErrno, Session, SessionNotifier, SessionObserver};
use calloop::{
signals::{Signal, Signals},
LoopHandle, Source,
};
use calloop::signals::{Signal, Signals};
use nix::{
fcntl::{self, open, OFlag},
libc::c_int,
@ -55,11 +55,8 @@ use nix::{
Error as NixError, Result as NixResult,
};
use std::{
cell::RefCell,
io::Error as IoError,
os::unix::io::RawFd,
path::Path,
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -158,6 +155,7 @@ pub struct DirectSessionNotifier {
signals: Vec<Option<Box<dyn SessionObserver>>>,
signal: Signal,
logger: ::slog::Logger,
source: Option<Signals>,
}
impl DirectSession {
@ -201,6 +199,7 @@ impl DirectSession {
signals: Vec::new(),
signal,
logger: logger.new(o!("vt" => format!("{}", vt), "component" => "session_notifier")),
source: None,
},
)),
Err(err) => {
@ -390,64 +389,62 @@ impl DirectSessionNotifier {
}
}
/// Bound logind session that is driven by the [`EventLoop`](calloop::EventLoop).
///
/// See [`direct_session_bind`] for details.
pub struct BoundDirectSession {
source: Source<Signals>,
notifier: Rc<RefCell<DirectSessionNotifier>>,
kill_source: Box<dyn Fn(Source<Signals>)>,
impl calloop::EventSource for DirectSessionNotifier {
type Event = ();
type Metadata = ();
type Ret = ();
fn process_events<F>(
&mut self,
readiness: calloop::Readiness,
token: calloop::Token,
_: F,
) -> std::io::Result<()>
where
F: FnMut((), &mut ()),
{
let mut source = self.source.take();
if let Some(ref mut source) = source {
source.process_events(readiness, token, |_, _| self.signal_received())?;
}
self.source = source;
Ok(())
}
impl BoundDirectSession {
/// Unbind the direct session from the [`EventLoop`](calloop::EventLoop)
pub fn unbind(self) -> DirectSessionNotifier {
let BoundDirectSession {
source,
notifier,
kill_source,
} = self;
kill_source(source);
Rc::try_unwrap(notifier)
.map(RefCell::into_inner)
.unwrap_or_else(|_| panic!("Notifier should have been freed from the event loop!"))
fn register(&mut self, poll: &mut calloop::Poll, token: calloop::Token) -> std::io::Result<()> {
if self.source.is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"This DirectSessionNotifier is already registered.",
));
}
let mut source = Signals::new(&[self.signal])?;
source.register(poll, token)?;
self.source = Some(source);
Ok(())
}
fn reregister(&mut self, poll: &mut calloop::Poll, token: calloop::Token) -> std::io::Result<()> {
if let Some(ref mut source) = self.source {
source.reregister(poll, token)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"This DirectSessionNotifier is not currently registered.",
))
}
}
/// Bind a [`DirectSessionNotifier`] to an [`EventLoop`](calloop::EventLoop).
///
/// Allows the [`DirectSessionNotifier`] to listen for incoming signals signalling the session state.
/// If you don't use this function [`DirectSessionNotifier`] will not correctly tell you the current
/// session state and call it's [`SessionObserver`]s.
pub fn direct_session_bind<Data: 'static>(
notifier: DirectSessionNotifier,
handle: LoopHandle<Data>,
) -> ::std::result::Result<BoundDirectSession, (IoError, DirectSessionNotifier)> {
let signal = notifier.signal;
let source = match Signals::new(&[signal]) {
Ok(s) => s,
Err(e) => return Err((e, notifier)),
};
let notifier = Rc::new(RefCell::new(notifier));
let fail_notifier = notifier.clone();
let source = handle
.insert_source(source, {
let notifier = notifier.clone();
move |_, _, _| notifier.borrow_mut().signal_received()
})
.map_err(move |e| {
// the backend in the closure should already have been dropped
let notifier = Rc::try_unwrap(fail_notifier)
.unwrap_or_else(|_| unreachable!())
.into_inner();
(e.into(), notifier)
})?;
let kill_source = Box::new(move |source| handle.kill(source));
Ok(BoundDirectSession {
source,
notifier,
kill_source,
})
fn unregister(&mut self, poll: &mut calloop::Poll) -> std::io::Result<()> {
if let Some(mut source) = self.source.take() {
source.unregister(poll)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"This DirectSessionNotifier is not currently registered.",
))
}
}
}
/// Errors related to direct/tty sessions