Source code for science_jubilee.tools.PneumaticSampleLoader

import json
import time
from typing import Optional

import requests

from science_jubilee import Machine
from science_jubilee.tools import Tool
from science_jubilee.tools.Tool import (
    Tool,
    ToolConfigurationError,
    ToolStateError,
    requires_active_tool,
)


[docs] class PneumaticSampleLoader(Tool): """ Interfaces the AFL sample loader tool with Jubilee deck """ def __init__( self, url, port, name, cell_location, safe_position, username, password ): """ HTTP Syringe is digital syringe for Jubilee """
[docs] self.name = name
[docs] self.url = url + ":" + port
[docs] self.port = port
[docs] self.safe_position = safe_position
[docs] self.cell_location = cell_location
[docs] self.username = username
[docs] self.password = password
[docs] self.arm_down_delay = 10
self.login() self.update_status()
[docs] self.index = None
[docs] def login(self): r = requests.post( self.url + "/login", json={"username": self.username, "password": self.password}, ) token = r.json()["token"] self.auth_header = {"Authorization": f"Bearer {token}"}
@classmethod
[docs] def from_config(cls, fp): with open(fp) as f: kwargs = json.load(f) return cls(index, **kwargs)
# @requires_active_tool
[docs] def load_sample(self, tool, sample_location: str, volume) -> bool: """ Load a sample into the sample cell using pneumatic pressure. First moves to the specified position, then performs the loading operation. Args: tool: pipette-like tool (ie OT P300, HTTP syringe) sample_location: location Returns: bool: True if sample was loaded successfully, False otherwise. """ self.prepare_cell() # verify arm is raised if self.arm_state != "UP": raise ValueError("Arm is not raised") # sample transfer if self._machine.active_tool_index != tool.index: self._machine.park_tool() self._machine.pickup_tool(tool) tool.aspirate(volume, sample_location) tool.dispense(volume, self.cell_location) self._machine.safe_z_movement() self._machine.move_to(z=self.safe_position[2]) self._machine.move_to( x=self.safe_position[0], y=self.safe_position[1], z=self.safe_position[2] ) self._load_sample(volume) # wait for arm to be down return
[docs] def rinse_cell(self) -> bool: """ Clean the sample cell using pressurized air or cleaning solution. Args: pressure (float, optional): The pressure (PSI) to use for cleaning. If None, uses default_clean_pressure from config. cycles (int): Number of cleaning cycles to perform. Returns: bool: True if cleaning was successful, False otherwise. """ self._safe_position() self._rinse_cell()
[docs] def prepare_cell(self): """ Prepare the cell for loading - raise arm and make sure it is clean """ if self.get_cell_state() != "RINSED" and self.get_cell_state() != "READY": self.rinse_cell() self._safe_position() self._prepare_load() self.update_status()
[docs] def _prepare_load(self): """ Raise the arm of the pneumatic sample loader. """ task = {"task_name": "prepareLoad"} task_id = self.enqueue(task) while self.get_cell_state() != "READY": time.sleep(1) return
[docs] def _load_sample(self, volume): task = {"task_name": "loadSample", "sampleVolume": volume} task_id = self.enqueue(task) while self.get_cell_state() != "LOADED": time.sleep(1) return
[docs] def _rinse_cell(self): task = {"task_name": "rinseCell"} task_id = self.enqueue(task) # block until rinse is done while self.get_cell_state() != "RINSED": time.sleep(1) return
[docs] def update_status(self) -> dict: """ Get the current status of the sample loader. Returns: dict: Status information including whether sample is loaded and current pressure settings. """ # Get status from HTTP endpoint r = requests.get(self.url + "/driver_status", headers=self.auth_header) # print("status r code", r.status_code) # print("status: ", r.content) status_str = r.content.decode("utf-8") self.status_list = json.loads(status_str) self.cell_state, self.arm_state = self.parse_state(self.status_list)
[docs] def get_cell_state(self) -> str: """ Get the current state of the sample cell. Returns: str: The current state of the sample cell (e.g., 'LOADED', 'IDLE', etc.) """ self.update_status() return self.cell_state
[docs] def parse_state(self, status_list: str) -> tuple[str, str]: """ Parse the state and arm state from a status string. Args: status_str (str): Raw status string from the device Returns: tuple[str, str]: A tuple containing (cell_state, arm_state) e.g., ('LOADED', 'DOWN') """ try: # Convert string to list using json.loads cell_state = "UNKNOWN" arm_state = "UNKNOWN" # Find both state entries for item in status_list: if item.startswith("State: "): cell_state = item.split("State: ")[1] elif item.startswith("Arm State: "): arm_state = item.split("Arm State: ")[1] return cell_state, arm_state except json.JSONDecodeError: print("Error parsing status string") return "UNKNOWN", "UNKNOWN"
[docs] def enqueue(self, task: dict): """ Enqueue a task to be executed by the pneumatic sample loader. returns task uuid """ r = requests.post(self.url + "/enqueue", headers=self.auth_header, json=task) if r.status_code != 200: raise Exception(f"Error enqueuing task: {r.json()}") return r.content.decode("utf-8")
[docs] def unpause_queue(self): r = requests.post( self.url + "/pause", headers=self.auth_header, json={"state": False} ) if r.status_code != 200: raise Exception(f"Error unpausing queue: {r.json()}") return
[docs] def pause_queue(self): r = requests.post( self.url + "/pause", headers=self.auth_header, json={"state": True} ) if r.status_code != 200: raise Exception(f"Error pausing queue: {r.json()}") return
[docs] def _safe_position(self): """ Check if currentlu in safe postion, and if not, move to it """ if not self.get_safety_state(): self._machine.safe_z_movement() self._machine.move_to( x=self.safe_position[0], y=self.safe_position[1], z=self.safe_position[2], ) return
[docs] def get_safety_state(self): """ Returns the safety state of the machine. """ positions = self._machine.position x, y, z = positions[0], positions[1], positions[2] # Check if position is within safe bounds if x > self.safe_position[0]: return False if y < self.safe_position[1]: return False if z < self.safe_position[2]: return False return True