2021-05-09 10:09:51 +02:00
|
|
|
import asyncio
|
|
|
|
from dbus_next.aio import MessageBus
|
|
|
|
from dbus_next.service import ServiceInterface, method
|
|
|
|
import dbus_next.introspection as intr
|
|
|
|
from dbus_next import BusType
|
|
|
|
|
|
|
|
|
|
|
|
class SampleInterface(ServiceInterface):
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__('test.interface')
|
|
|
|
|
|
|
|
@method()
|
|
|
|
def Ping(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
@method()
|
2021-10-18 21:58:21 +02:00
|
|
|
def ConcatStrings(self, what1: 's', what2: 's') -> 's': # noqa: F821
|
2021-05-09 10:09:51 +02:00
|
|
|
return what1 + what2
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
bus_name = 'dbus.next.sample'
|
|
|
|
|
|
|
|
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
|
|
|
bus2 = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
|
|
|
|
|
|
|
await bus.request_name(bus_name)
|
|
|
|
|
|
|
|
service_interface = SampleInterface()
|
|
|
|
bus.export('/test/path', service_interface)
|
|
|
|
|
|
|
|
introspection = await bus2.introspect(bus_name, '/test/path')
|
|
|
|
assert type(introspection) is intr.Node
|
|
|
|
obj = bus2.get_proxy_object(bus_name, '/test/path', introspection)
|
|
|
|
interface = obj.get_interface(service_interface.name)
|
|
|
|
|
|
|
|
result = await interface.call_ping()
|
|
|
|
assert result is None
|
|
|
|
|
|
|
|
result = await interface.call_concat_strings('hello ', 'world')
|
|
|
|
assert result == 'hello world'
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|