"""Routines and classes related to RPyC package"""
from . import module as module_utils, net, py3, strpack
import numpy as np
import importlib
rpyc=importlib.import_module("rpyc") # Python 2 compatibility (importing module from a module with the same name)
import pickle
import warnings
import socket
_default_packers={"numpy":np.ndarray.tostring,"pickle":pickle.dumps}
_default_unpackers={"pickle":pickle.loads}
def _is_tunnel_service(serv):
return hasattr(serv,"tunnel_socket")
def _obtain_single(proxy, serv):
if _is_tunnel_service(serv):
loc_serv=serv.peer
async_send=rpyc.async_(serv.tunnel_send)
async_send(proxy,packer="pickle")
data=pickle.loads(loc_serv.tunnel_recv())
return data
else:
return rpyc.classic.obtain(proxy)
_numpy_block_size=int(2**20)
[docs]def obtain(proxy, serv=None):
"""
Obtain a remote netfref object by value (i.e., copy it to the local Python instance).
Wrapper around :func:`rpyc.utils.classic.obtain` with some special cases handling.
`serv` specifies the current remote service. If it is of type :class:`SocketTunnelService`, use its socket tunnel for faster transfer.
"""
if not isinstance(proxy,rpyc.BaseNetref):
return proxy
if isinstance(proxy, np.ndarray):
elsize=np.prod(proxy.shape,dtype="u8")
bytesize=proxy.dtype.itemsize*elsize
if bytesize>_numpy_block_size:
if _is_tunnel_service(serv):
loc_serv=serv.peer
async_send=rpyc.async_(serv.tunnel_send)
async_send(proxy,packer="numpy")
data=loc_serv.tunnel_recv()
return np.frombuffer(data,dtype=proxy.dtype.str).reshape(proxy.shape)
else:
fproxy=proxy.flatten()
loc=np.zeros(elsize,dtype=proxy.dtype.str)
block_size=_numpy_block_size//proxy.dtype.itemsize
for pos in range(0,elsize,block_size):
loc[pos:pos+block_size]=rpyc.classic.obtain(fproxy[pos:pos+block_size])
return loc.reshape(proxy.shape)
return rpyc.classic.obtain(proxy)
[docs]def transfer(obj, serv):
"""
Send a local object to the remote PC by value (i.e., copy it to the remote Python instance).
A 'reversed' version of :func:`obtain`.
"""
return serv.transfer(obj)
[docs]class SocketTunnelService(rpyc.SlaveService):
"""
Extension of the standard :class:`rpyc.core.service.SlaveService` with built-in network socket tunnel for faster data transfer.
In order for the tunnel to work, services on both ends need to be subclasses of :class:`SocketTunnelService`.
Because of the initial setup protocol, the two services are asymmetric: one should be 'server' (corresponding to the listening server),
and one should be 'client' (external connection). The roles are decided by the `server` constructor parameter.
"""
_tunnel_block_size=int(2**20)
def __init__(self, server=False):
rpyc.SlaveService.__init__(self)
self.server=server
_default_tunnel_timeout=10.
def _recv_socket(self, addr):
"""Set up a listener to receive a socket connection from the other service."""
def listen(s):
s.set_timeout(self._default_tunnel_timeout)
self.tunnel_socket=s
remote_call=rpyc.async_(self._conn.root._send_socket)
def port_func(port):
remote_call(addr,port)
net.listen(None,0,listen,port_func=port_func,timeout=self._default_tunnel_timeout,connections_number=1,socket_args={"nodelay":True})
def _send_socket(self, dst_addr, dst_port):
"""Set up a client socket to connect to the other service."""
self.tunnel_socket=net.ClientSocket(timeout=self._default_tunnel_timeout,nodelay=True)
self.tunnel_socket.connect(dst_addr,dst_port)
[docs] def tunnel_send(self, obj, packer=None):
"""
Send data through the socket tunnel.
If `packer` is not ``None``, it defines a function to convert `obj` to a bytes string.
"""
packer=_default_packers.get(packer,packer)
if packer:
obj=packer(obj)
nchunks=(len(obj)-1)//self._tunnel_block_size+1
self.tunnel_socket.send_fixedlen(strpack.pack_uint(nchunks,4,">"))
for pos in range(0,len(obj),self._tunnel_block_size):
self.tunnel_socket.send_decllen(obj[pos:pos+self._tunnel_block_size])
[docs] def tunnel_recv(self, unpacker=None):
"""
Receive data sent through the socket tunnel.
If `unpacker` is not ``None``, it defines a function to convert the received bytes string into an object.
"""
nchunks=strpack.unpack_uint(self.tunnel_socket.recv_fixedlen(4),">")
chunks=[]
for _ in range(nchunks):
chunks.append(self.tunnel_socket.recv_decllen())
obj=b"".join(chunks)
unpacker=_default_unpackers.get(unpacker,unpacker)
return unpacker(obj) if unpacker else obj
[docs] def obtain(self, proxy):
"""Execute :func:`obtain` on the local instance"""
return obtain(proxy,self)
[docs] def transfer(self, obj):
"""Execute :func:`transfer` on the local instance"""
return self.peer.obtain(obj)
[docs] def on_connect(self, conn):
rpyc.SlaveService.on_connect(self,conn)
self.peer=conn.root
if not self.server:
s=socket.fromfd(conn.fileno(),socket.AF_INET,socket.SOCK_STREAM)
src_addr=s.getsockname()[0]
s.close()
self._recv_socket(src_addr)
[docs] def on_disconnect(self, conn):
try:
self.tunnel_socket.close()
except AttributeError:
pass
rpyc.SlaveService.on_disconnect(self,conn)
[docs]class DeviceService(SocketTunnelService):
"""
Device RPyC service.
Expands on :class:`SocketTunnelService` by adding :meth:`get_device` method,
which opens local devices, tracks them, and closes them automatically on disconnect.
"""
def __init__(self, verbose=False):
SocketTunnelService.__init__(self,server=True)
self.verbose=verbose
[docs] def on_connect(self, conn):
SocketTunnelService.on_connect(self,conn)
self.devices=[]
if self.verbose:
print("Connected client {}".format(self._conn))
[docs] def on_disconnect(self, conn):
for dev in self.devices:
try:
dev.close()
except:
pass
self.devices=[]
if self.verbose:
print("Disconnected client {}".format(self._conn))
SocketTunnelService.on_disconnect(self,conn)
[docs] def get_device(self, module, cls, *args, **kwargs):
"""
Connect to a device.
`cls` and `module` are names of the device class and the containing module
(for module name the ``"pylablib.aux_libs.devices"`` prefix can be omitted)
"""
try:
module=importlib.import_module(module)
except ModuleNotFoundError:
module=importlib.import_module(module_utils.get_library_name()+".aux_libs.devices."+module)
module._rpyc=True
cls=module.__dict__[cls]
dev=cls(*args,**kwargs)
self.devices.append(dev)
return dev
[docs]def run_device_service(port=18812, verbose=False):
"""Start :class:`DeviceService` at the given port"""
rpyc.ThreadedServer(rpyc.utils.helpers.classpartial(DeviceService,verbose=verbose),port=port).start()
[docs]def connect_device_service(addr, port=18812, timeout=3, attempts=2):
"""
Connect to the :class:`DeviceService` running at the given address and port
`timeout` and `attempts` define respectively timeout of a single connection attempt, and the number of attempts
(RPyC default is 3 seconds timeout and 6 attempts).
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
s=rpyc.SocketStream.connect(addr,port,timeout=timeout,attempts=attempts)
return rpyc.connect_stream(s,SocketTunnelService).root
except net.socket.timeout:
return None