"""Driver for Controlling Jubilee"""
# import websocket # for reading the machine model
import json
import logging
import os
import time
import warnings
from functools import wraps
from pathlib import Path
from typing import Union
import requests # for issuing commands
from requests.adapters import HTTPAdapter, Retry
from science_jubilee.decks.Deck import Deck
from science_jubilee.tools.Tool import Tool
# TODO: Figure out how to print error messages from the Duet.
[docs]
logger = logging.getLogger(__name__)
# copied from machine agency version, may not be needed here
[docs]
def get_root_dir():
"""Return the path to the duckbot directory."""
return Path(__file__).parent.parent
##########################################
# ERRORS
##########################################
[docs]
class MachineConfigurationError(Exception):
"""Raise this error if there is something wrong with how the machine is configured"""
pass
[docs]
class MachineStateError(Exception):
"""Raise this error if the machine is in the wrong state to perform the requested action."""
pass
##########################################
# DECORATORS
##########################################
[docs]
def machine_homed(func):
"""Decorator used to check if the machine is homed before performing certain actions."""
def homing_check(self, *args, **kwds):
# Check the cached value if one exists.
if self.simulated:
return func(self, *args, **kwds)
if self.axes_homed and all(self.axes_homed):
return func(self, *args, **kwds)
# Request homing status from the object model if not known.
self.axes_homed = json.loads(self.gcode('M409 K"move.axes[].homed"'))["result"]
if not all(self.axes_homed):
raise MachineStateError("Error: machine must first be homed.")
return func(self, *args, **kwds)
return homing_check
[docs]
def requires_deck(func):
"""Decorator used ot check if a deck has been configured before performing certain actions."""
def deck_check(self, *args, **kwds):
if self.deck is None:
raise MachineStateError("Error: No deck is set up")
return func(self, *args, **kwds)
return deck_check
[docs]
def requires_safe_z(func):
"""Decorator used to ensure the deck is at a safe height before performing certain actions."""
def z_check(self, *args, **kwds):
current_z = float(self.get_position()["Z"])
if self.deck:
safe_z = self.deck.safe_z
else:
safe_z = 0
# warnings.warn(f"No deck configured, safe z height has been set to {safe_z}. Please modify this if needed.")
if current_z < safe_z:
self.move_to(z=safe_z + 20)
return func(self, *args, **kwds)
return z_check
##########################################
# MACHINE CLASS
##########################################
[docs]
class Machine:
"""A class representation of Jubilee used to send motion commands and polling the machine state."""
# TODO: Set this up so that a keyboard interrupt leaves the machine in a safe state - ie tool offsets correct. I had an issue
# where I keyboard interrupted during pipette tip pickup - tip was picked up but offset was not applied, crashing machine on next move. This should not be possible.
[docs]
LOCALHOST = "192.168.1.2"
def __init__(
self,
port: str = None,
baudrate: int = 115200,
address: str = None,
deck_config: str = None,
simulated: bool = False,
crash_detection: bool = False,
crash_handler=None,
):
"""Initialize the Machine object.
:param port: The port to connect to the machine over serial, defaults to None
:type port: str, optional
:param baudrate: The baudrate to use when connecting to the machine, defaults to 115200
:type baudrate: int, optional
:param address: The IP address of the machine. This should match what is loaded onto the config.g on the Jubilee Duet's main board, defaults to None
:type address: str, optional
:param deck_config: The name of the deck configuration file to load, defaults to None
:type deck_config: str, optional
:param simulated: Whether to simulate the machine, defaults to False
:type simulated: bool, optional
:param crash_detection: Whether to monitor for tool changer crash detection. See science-jubilee docs for more. Default to False (no detection)
:type crash_detection: bool
:param crash_handler: Function to call when crash is detected. See docs
:type crash_handler: None or function
:raises MachineStateError: If the machine is not in the correct state to perform the requested action. This is a user error, not a machine error.
:raises MachineConfigurationError: If the machine does nto support the indicated configuration, e.g., a tool index is already in use.
:raises ValueError: If Jubilee returns an invalid value, e.g., the axis limit queried is not correct or query is unsuccessful.
"""
if address != self.__class__.LOCALHOST:
print(
"Warning: disconnecting this application from the network will halt connection to Jubilee."
)
# Machine Specs
# serial info
[docs]
self.baudrate = baudrate
[docs]
self.lineEnding = "\n" # serial stuff
# HTTP info
# self.debug = debug
[docs]
self.simulated = simulated
[docs]
self.model_update_timestamp = 0
[docs]
self.wake_time = None # Next scheduled time that the update thread updates.
# crash detection
[docs]
self.crash_detection = crash_detection
[docs]
self.crash_handler = crash_handler
[docs]
self._absolute_positioning = True
[docs]
self._absolute_extrusion = (
True # Extrusion positioning is set separately from other axes
)
[docs]
self._axis_limits = (None, None, None) # Cached value under the @property.
[docs]
self.axes_homed = [
False
] * 4 # We have at least X/Y/Z/U axes to home. Additional axes handled below in connect()
# TODO: this is confusingly named
[docs]
self.current_well = None
requests_session = requests.Session()
retries = Retry(
total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]
)
requests_session.mount("http://", HTTPAdapter(max_retries=retries))
requests_session.headers["Connection"] = "close"
[docs]
self.session = requests_session
if deck_config is not None:
self.load_deck(deck_config)
self.connect()
self._set_absolute_positioning() # force=True)
[docs]
def connect(self):
"""Connects to Jubilee over http.
:raises MachineStateError: If the connection to the machine is unsuccessful.
"""
# TODO: incorporate serial connection from machine agency version
if self.simulated:
return
# Do the equivalent of a ping to see if the machine is up.
# if self.debug:
# print(f"Connecting to {self.address} ...")
try:
# "Ping" the machine by updating the only cacheable information we care about.
# TODO: This should handle a response from self.gcode of 'None' gracefully.
max_tries = 50
for i in range(max_tries):
response = json.loads(self.gcode('M409 K"move.axes[].homed"'))[
"result"
][:4]
if len(response) == 0:
continue
else:
break
self.axes_homed = response
# These data members are tied to @properties of the same name
# without the '_' prefix.
# Upon reconnecting, we need to flag that the @property must
# refresh; otherwise we will retrieve old values that may be invalid.
self._active_tool_index = None
self._tool_z_offsets = None
self._axis_limits = None
# To save time upon connecting, let's just hit the API on the
# first try for all the @properties we care about.
self.configured_axes
self.active_tool_index
self.tool_z_offsets
self.axis_limits
# pprint.pprint(json.loads(requests.get("http://127.0.0.1/machine/status").text))
# TODO: recover absolute/relative from object model instead of enforcing it here.
self._set_absolute_positioning()
except json.decoder.JSONDecodeError as e:
raise MachineStateError("DCS not ready to connect.") from e
except requests.exceptions.Timeout as e:
raise MachineStateError(
"Connection timed out. URL may be invalid, or machine may not be connected to the network."
) from e
# if self.debug:
# print("Connected.")
@property
@property
@property
@active_tool_index.setter
def active_tool_index(self, tool_index: int):
"""Sets the current tool, and toggle the old tool off."""
if self.tool is not None:
self.tool.is_active_tool = False
if tool_index < 0:
self._active_tool_index = -1
self.tool = None
else:
self._active_tool_index = tool_index
if tool_index not in self.tools:
temp_tool = Tool(tool_index, "temp_tool")
self.load_tool(temp_tool)
tool = self.tools[tool_index]["tool"]
self.tool = tool
tool.is_active_tool = True
@property
@property
[docs]
def axis_limits(self):
"""Return (in XYZU order) a list of tuples specifying (min, max) axis limit
:return: A list of tuples specifying (min, max) axis limit
:rtype: list
Note: This list is obtained directly from the tools added to the machine's `config.g` file.
"""
# Starting from fresh connection, query from the Duet.
if self._axis_limits is None:
try:
max_tries = 50
for i in range(max_tries):
response = json.loads(self.gcode('M409 K"move.axes"'))["result"]
if len(response) == 0:
continue
else:
break
# pprint.pprint(response)
self._axis_limits = [] # Create a fresh list.
for axis_data in response:
axis_min = axis_data["min"]
axis_max = axis_data["max"]
self._axis_limits.append((axis_min, axis_max))
except ValueError as e:
print("Error occurred trying to read axis limits on each axis!")
raise e
# Return the cached value.
return self._axis_limits
@property
[docs]
def position(self):
"""Returns the current machine control point in mm.
:return: A dictionary of the machine control point in mm. The keys are the axis name, e.g. 'X'
:rtype: dict
"""
# Axes are ordered X, Y, Z, U, E, E0, E1, ... En, where E is a copy of E0.
response_chunks = self.gcode("M114").split()
positions = [float(a.split(":")[1]) for a in response_chunks[:3]]
return positions
##########################################
# BED PLATE
##########################################
[docs]
def load_deck(
self,
deck_filename: str,
path: str = os.path.join(os.path.dirname(__file__), "decks", "deck_definition"),
):
"""Load a deck configuration file onto the machine.
:param deck_filename: The name of the deck configuration file.
:type deck_filename: str
:param path: The path to the deck configuration `.json` files for the labware,
defaults to the 'deck_definition/' in the science_jubilee/decks directory.
:type path: str, optional
:return: A :class:`Deck` object
:rtype: :class:`Deck`
"""
deck = Deck(deck_filename, path=path)
self.deck = deck
return deck
[docs]
def gcode(self, cmd: str = "", timeout=None, response_wait: float = 60):
"""Send a G-Code command to the Machine and return the response.
:param cmd: The G-Code command to send, defaults to ""
:type cmd: str, optional
:param timeout: The time to wait for a response from the machine, defaults to None
:type timeout: float, optional
:param response_wait: The time to wait for a response from the machine, defaults to 30
:type response_wait: float, optional
:return: The response message from the machine. If too long, the message might not display in the terminal.
:rtype: str
"""
# TODO: Add serial option for gcode commands from MA
if self.simulated:
print(f"sending: {cmd}")
return None
try:
# Try sending the command with requests.post
response = requests.post(
f"http://{self.address}/machine/code", data=f"{cmd}", timeout=timeout
).text
if "rejected" in response:
raise requests.RequestException
except requests.RequestException:
# If requests.post fails ( not supported for standalone mode), try sending the command with requests.get
try:
# Paraphrased from Duet HTTP-requests page:
# Client should query `rr_model?key=seqs` and monitor `seqs.reply`. If incremented, the command went through
# and the response is available at `rr_reply`.
reply_response = self.session.get(
f"http://{self.address}/rr_model?key=seqs"
)
logging.debug(
f"MODEL response, status: {reply_response.status_code}, headers:{reply_response.headers}, content:{reply_response.content}"
)
reply_count = reply_response.json()["result"]["reply"]
buffer_response = self.session.get(
f"http://{self.address}/rr_gcode?gcode={cmd}", timeout=timeout
)
logging.debug(
f"GCODE response, status: {buffer_response.status_code}, headers:{buffer_response.headers}, content:{buffer_response.content}"
)
# wait for a response code to be appended
# TODO: Implement retry backoff for managing long-running operations to avoid too many requests error. Right now this is handled by the generic exception catch then sleep. Real fix is some sort of backoff for things running longer than a few seconds.
tic = time.time()
try_count = 0
while True:
try:
new_reply_response = self.session.get(
f"http://{self.address}/rr_model?key=seqs"
)
logger.debug(
f"MODEL response, status: {new_reply_response.status_code}, headers:{new_reply_response.headers}, content:{new_reply_response.content}"
)
new_reply_count = new_reply_response.json()["result"]["reply"]
if new_reply_count != reply_count:
response = self.session.get(
f"http://{self.address}/rr_reply"
)
logger.debug(
f"REPLY response, status: {response.status_code}, headers:{response.headers}, content:{response.content}"
)
response = response.text
# crash detection monitoring happens here
if self.crash_detection:
if "crash detected" in response:
logger.error("Jubilee crash detected")
handler_response = self.crash_handler.handle_crash()
break
elif time.time() - tic > response_wait:
response = None
break
time.sleep(self.delay_time(try_count))
try_count += 1
except Exception as e:
print(f"Connection error ({e}), sleeping 1 second")
logging.debug(f"Error in gcode reply wait loop: {e}")
time.sleep(2)
continue
except requests.RequestException as e:
print(f"Both `requests.post` and `requests.get` requests failed: {e}")
response = None
# TODO: handle this with logging. Also fix so all output goes to logs
return response
[docs]
def delay_time(self, n):
"""
Calculate delay time for next request. dumb hard code for now, could be fancy exponential backoff
"""
if n == 0:
return 0
if n < 10:
return 0.1
if n < 20:
return 0.2
if n < 30:
return 0.3
else:
return 1
[docs]
def _set_absolute_positioning(self):
"""Set absolute positioning for all axes except extrusion"""
self.gcode("G90")
self._absolute_positioning = True
[docs]
def _set_relative_positioning(self):
"""Set relative positioning for all axes except extrusion"""
self.gcode("G91")
self.absolute_positioning = False
[docs]
def _set_absolute_extrusion(self):
"""Set absolute positioning for extrusion"""
self.gcode("M82")
self._absolute_extrusion = True
[docs]
def _set_relative_extrusion(self):
"""Set relative positioning for extrusion"""
self.gcode("M83")
self.absolute_extrusion = False
[docs]
def push_machine_state(self):
"""Push machine state onto a stack"""
self.gcode("M120")
[docs]
def pop_machine_state(self):
"""Recover previous machine state"""
self.gcode("M121")
[docs]
def download_file(self, filepath: str = None, timeout: float = None):
"""Download a file into a file object. Full machine filepath must be specified.
Example: /sys/tfree0.g
:param filepath: The full filepath of the file to download, defaults to None
:type filepath: str, optional
:param timeout: The time to wait for a response from the machine, defaults to None
:type timeout: float, optional
:return: The file contents
:rtype: file object
"""
# RRF3 Only
file_contents = requests.get(
f"http://{self.address}/rr_download?name={filepath}", timeout=timeout
)
return file_contents
[docs]
def reset(self):
"""Issue a software reset."""
# End the subscribe thread first.
self.gcode("M999") # Issue a board reset. Assumes we are already connected
self.axes_homed = [False] * 4
self.disconnect()
print("Reconnecting...")
for i in range(10):
time.sleep(1)
try:
self.connect()
return
except MachineStateError as e:
pass
raise MachineStateError("Reconnecting failed.")
[docs]
def home_all(self):
"""Home all axes."""
# Having a tool is only possible if the machine was already homed.
# TODO: Check if machine is already homed and have a user input to verify clear deck to avoid wasting time by accidentally rerunning and \
# avoid major deck wrecks
# TODO: Catch errors where tool is already on and forward to user for fix
if self.active_tool_index != -1:
self.park_tool()
self.gcode("G28")
self._set_absolute_positioning()
# Update homing state. Do not query the object model because of race condition.
self.axes_homed = [True, True, True, True] # X, Y, Z, U
### test to see if we can get the number of axis home using the pop_machine_state(self) !! MP 07/25/23
[docs]
def home_xyu(self):
"""Home the XYU axes. Home Y before X to prevent possibility of crashing into the tool rack."""
self.gcode("G28 Y")
self.gcode("G28 X")
self.gcode("G28 U")
self._set_absolute_positioning()
# Update homing state. Pull Z from the object model which will not create a race condition.
z_home_status = json.loads(self.gcode('M409 K"move.axes[].homed"'))["result"][2]
self.axes_homed = [True, True, z_home_status, True]
[docs]
def home_x(self):
"""Home the X axis"""
cmd = "G28 X"
self.gcode(cmd)
[docs]
def home_y(self):
"""Home the Y axis"""
cmd = "G28 Y"
self.gcode(cmd)
[docs]
def home_u(self):
"""Home the U (tool) axis"""
cmd = "G28 U"
self.gcode(cmd)
[docs]
def home_v(self):
"""Home the V axis"""
cmd = "G28 V"
self.gcode(cmd)
[docs]
def home_z(self):
"""Home the Z axis.
Note: The deck must be clear first. Will ask for user input to verify.
"""
response = input("Is the Deck free of obstacles? [y/n]")
if response.lower() in ["y", "yes", "Yes", "Y", "YES"]:
self.gcode("G28 Z")
else:
print("The deck needs to be empty of all labware before proceeding.")
self._set_absolute_positioning()
[docs]
def home_e(self):
"""
Home the extruder axis (syringe)
"""
pass
[docs]
def home_in_place(self, *args: str):
"""Set the current location of a machine axis or axes to 0."""
for axis in args:
if axis.upper() not in ["X", "Y", "Z", "U"]:
raise TypeError(f"Error: cannot home unknown axis: {axis}.")
self.gcode(f"G92 {axis.upper()}0")
@machine_homed
[docs]
def _move_xyzev(
self,
x: float = None,
y: float = None,
z: float = None,
e: float = None,
v: float = None,
s: float = 6000,
param: str = None,
wait: bool = False,
):
"""Move X/Y/Z/E/V axes. Set absolute/relative mode externally.
:param x: x position on the bed, in whatever units have been set (default mm)
:type x: float, optional
:param y: y position on the bed, in whatever units have been set (default mm)
:type y: float, optional
:param z: z position on the bed, in whatever units have been set (default mm)
:type z: float, optional
:param e: extruder position, in whatever units have been set (default mm)
:type e: float, optional
:param v: v axis position, in whatever units have been set (default mm)
:type v: float, optional
:param s: speed at which to move (default 6000 mm/min)
:type s: float, optional
"""
x = "{0:.2f}".format(x) if x is not None else None
y = "{0:.2f}".format(y) if y is not None else None
z = "{0:.2f}".format(z) if z is not None else None
e = "{0:.2f}".format(e) if e is not None else None
v = "{0:.2f}".format(v) if v is not None else None
s = "{0:.2f}".format(s)
# initialize coordinates commands
x_cmd = y_cmd = z_cmd = e_cmd = v_cmd = f_cmd = param_cmd = ""
if x is not None:
x_cmd = f"X{x}"
if y is not None:
y_cmd = f"Y{y}"
if z is not None:
z_cmd = f"Z{z}"
if e is not None:
e_cmd = f"E{e}"
if v is not None:
v_cmd = f"V{v}"
if s is not None:
f_cmd = f"F{s}"
if param is not None:
param_cmd = param
cmd = f"G0 {z_cmd} {x_cmd} {y_cmd} {e_cmd} {v_cmd} {f_cmd} {param_cmd}"
self.gcode(cmd)
if wait:
self.gcode(f"M400")
[docs]
def move_to(
self,
x: float = None,
y: float = None,
z: float = None,
e: float = None,
v: float = None,
s: float = 6000,
param: str = None,
wait: bool = True,
):
"""Move to an absolute X/Y/Z/E/V position.
:param x: x position on the bed, in whatever units have been set (default mm)
:type x: float, optional
:param y: y position on the bed, in whatever units have been set (default mm)
:type y: float, optional
:param z: z position on the bed, in whatever units have been set (default mm)
:type z: float, optional
:param e: extruder position, in whatever units have been set (default mm)
:type e: float, optional
:param v: v axis position, in whatever units have been set (default mm)
:type v: float, optional
:param s: speed at which to move (default 6000 mm/min)
:type s: float, optional
"""
self._set_absolute_positioning()
self._move_xyzev(x=x, y=y, z=z, e=e, v=v, s=s, param=param, wait=wait)
[docs]
def move(
self,
dx: float = 0,
dy: float = 0,
dz: float = 0,
de: float = 0,
dv: float = 0,
s: float = 6000,
param: str = None,
wait: bool = True,
):
"""Move relative to the current position
:param dx: change in x position, in whatever units have been set (default mm)
:type dx: float, optional
:param dy: change in y position, in whatever units have been set (default mm)
:type dy: float, optional
:param dz: change in z position, in whatever units have been set (default mm)
:type dz: float, optional
:param de: change in e position, in whatever units have been set (default mm)
:type de: float, optional
:param dv: change in v position, in whatever units have been set (default mm)
:type dv: float, optional
:param s: speed at which to move (default 6000 mm/min)
:type s: float, optional
"""
# Check that the relative move doesn't exceed user-defined limit
# By default, ensure that it won't crash into the parked tools
if any(self._axis_limits):
x_limit, y_limit, z_limit = self._axis_limits[0:3]
pos = self.get_position()
if (
x_limit
and dx != 0
and (
(float(pos["X"]) + dx > x_limit[1])
or (float(pos["X"]) + dx < x_limit[0])
)
):
raise MachineStateError("Error: Relative move exceeds X axis limit!")
if (
y_limit
and dy != 0
and (
(float(pos["Y"]) + dy > y_limit[1])
or (float(pos["Y"]) + dy < y_limit[0])
)
):
raise MachineStateError("Error: Relative move exceeds Y axis limit!")
if (
z_limit
and dz != 0
and (
(float(pos["Z"]) + dz > z_limit[1])
or (float(pos["Z"]) + dz < z_limit[0])
)
):
raise MachineStateError("Error: Relative move exceeds Z axis limit!")
self._set_relative_positioning()
self._move_xyzev(x=dx, y=dy, z=dz, e=de, v=dv, s=s, param=param, wait=wait)
[docs]
def dwell(self, t: float, millis: bool = True):
"""Pauses the machine for a period of time.
:param t: time to pause, in milliseconds by default
:type t: float
:param millis: boolean, set to false to use seconds. default unit is milliseconds.
:type millis: bool, optional
"""
param = "P" if millis else "S"
cmd = f"G4 {param}{t}"
self.gcode(cmd)
[docs]
def safe_z_movement(self):
"""Move the Z axis to a safe height to avoid crashing into labware."""
# TODO is this redundant? can we reuse decorator ?
current_z = self.get_position()["Z"]
safe_z = self.deck.safe_z
if float(current_z) < safe_z:
self.move_to(z=safe_z + 20)
else:
pass
# TODO: Unload tool method
@requires_safe_z
@requires_safe_z
[docs]
def get_position(self):
"""Get the current position of the machine control point in mm.
:return: A dictionary of the machine control point in mm. The keys are the axis name, e.g. 'X'
:rtype: dict
"""
max_tries = 50
for i in range(max_tries):
resp = self.gcode("M114")
if "Count" not in resp:
continue
else:
break
positions = {}
keyword = " Count " # this is the keyword hosts like e.g. pronterface search for to track position
keyword_idx = resp.find(keyword)
count = 0
if keyword_idx > -1:
resp = resp[:keyword_idx]
position_elements = resp.split(" ")
for e in position_elements:
axis, pos = e.split(":", 2)
positions[axis] = pos
return positions
[docs]
def load_labware(
self, labware_filename: str, slot: int, path: str = None, order: str = "rows"
):
"""Function that loads a labware and associates it with a specific slot on the deck.
The slot offset is also applied to the labware asocaite with it.
:param labware_filename: The name of the labware configuration file.
:type labware_filename: str
:param slot: The index of the slot to load the labware into.
:type slot: int
:param path: The path to the labware configuration `.json` files for the labware.
:type path: str, optional
:param order: The order in which the labware is arranged on the deck.
Can be 'rows' or 'columns', defaults to 'rows'.
:type order: str, optional
:return: The :class:`Labware` object that has been loaded into the slot.
:rtype: :class:`Labware`
"""
if path is not None:
labware = self.deck.load_labware(
labware_filename, slot, path=path, order=order
)
else:
labware = self.deck.load_labware(labware_filename, slot, order=order)
return labware
# ***************MACROS***************
[docs]
def disconnect(self):
"""Close the connection."""
# Nothing to do?
pass
[docs]
def __enter__(self):
return self
[docs]
def __exit__(self, *args):
self.disconnect()