import json
import logging
import os
import time
from itertools import dropwhile, takewhile
from typing import Iterator, List, Tuple, Union
import numpy as np
import requests
from science_jubilee.labware.Labware import Labware, Location, Well
from science_jubilee.tools.Tool import (
Tool,
ToolConfigurationError,
ToolStateError,
requires_active_tool,
)
[docs]
class HTTPSyringe(Tool):
def __init__(self, index, name, url):
"""
HTTP Syringe is digital syringe for Jubilee
"""
# get config things from HTTP interface
config_r = requests.post(url + "/get_config", json={"name": name})
config = config_r.json()
super().__init__(index, **config, url=url)
status_r = requests.post(url + "/get_status", json={"name": name})
status = status_r.json()
[docs]
self.syringe_loaded = status["syringe_loaded"]
[docs]
self.remaining_volume = status["remaining_volume"]
return
@classmethod
[docs]
def from_config(cls, index, fp):
with open(fp) as f:
kwargs = json.load(f)
return cls(index, **kwargs)
[docs]
def status(self):
"""
Fetch and update status
"""
r = requests.post(self.url + "/get_status", json={"name": self.name})
status = r.json()
self.syringe_loaded = status["syringe_loaded"]
self.remaining_volume = status["remaining_volume"]
return status
[docs]
def load_syringe(self, volume, pulsewidth):
"""
Configure a syringe after physically loading it
volume: Current loaded volume in syringe
pulsewidth: current pulsewidth position of servo
"""
data = {}
data["volume"] = volume
data["pulsewidth"] = pulsewidth
data["name"] = self.name
requests.post(self.url + "/load_syringe", json=data)
status = self.status()
print(f'Loaded syringe, remaining volume {status["remaining_volume"]} uL')
return
@requires_active_tool
[docs]
def _aspirate(self, vol, s):
assert isinstance(vol, float) or isinstance(
vol, int
), "Vol must be float or int"
assert (
vol < self.capacity - self.remaining_volume
), f"Error: Syringe {self.name} available volume is {self.capacity - self.remaining_volume} uL, {vol} mL aspiration requested"
r = requests.post(
self.url + "/aspirate", json={"volume": vol, "name": self.name, "speed": s}
)
assert r.status_code == 200, f"Error in aspirate request: {r.content}"
status_r = requests.post(self.url + "/get_status", json={"name": self.name})
status_dict = status_r.json()
self.remaining_volume = status_dict["remaining_volume"]
return
@requires_active_tool
[docs]
def _dispense(self, vol, s):
assert isinstance(vol, float) or isinstance(
vol, int
), "Vol must be flaot or int"
assert (
vol <= self.remaining_volume
), f"Error: Syringe {self.name} remaining volume is {self.remaining_volume} uL, but {vol} uL dispense requested"
r = requests.post(
self.url + "/dispense", json={"volume": vol, "name": self.name, "speed": s}
)
assert r.status_code == 200, f"Error in dispense request: {r.content}"
status_r = requests.post(self.url + "/get_status", json={"name": self.name})
status_dict = status_r.json()
self.remaining_volume = status_dict["remaining_volume"]
return
@requires_active_tool
[docs]
def dispense(
self, vol: float, location: Union[Well, Tuple, Location], s: int = 100
):
"""Moves the pipette to the specified location and dispenses the desired volume of liquid
:param vol: The volume of liquid to dispense in uL
:type vol: float
:param location: The location to dispense the liquid into.
:type location: Union[Well, Tuple, Location]
:param s: Speed at which to dispense. Best effort compliance based on constraints of system. uL/S
:type s: int
:raises ToolStateError: If the pipette does not have a tip attached
"""
x, y, z = Labware._getxyz(location)
if type(location) == Well:
self.current_well = location
if z == location.z:
z = z + 10
else:
pass
elif type(location) == Location:
self.current_well = location._labware
else:
pass
self._machine.safe_z_movement()
self._machine.move_to(x=x, y=y, wait=True)
self._machine.move_to(z=z, wait=True)
self._dispense(vol, s)
@requires_active_tool
[docs]
def aspirate(
self, vol: float, location: Union[Well, Tuple, Location], s: int = 100
):
"""Moves the pipette to the specified location and aspirates the desired volume of liquid
:param vol: The volume of liquid to aspirate in uL
:type vol: float
:param location: The location from where to aspirate the liquid from.
:type location: Union[Well, Tuple, Location]
:param s: Speed at which to aspirate. Best effort compliance based on constraints of system. uL/S.
:type s: int
:raises ToolStateError: If the pipette does not have a tip attached
"""
x, y, z = Labware._getxyz(location)
if type(location) == Well:
self.current_well = location
elif type(location) == Location:
self.current_well = location._labware
else:
pass
self._machine.safe_z_movement()
self._machine.move_to(x=x, y=y, wait=True)
self._machine.move_to(z=z, wait=True)
self._aspirate(vol, s)
@requires_active_tool
[docs]
def mix(
self,
vol: float,
n_mix: int,
location: Union[Well, Tuple, Location],
t_hold: int = 1,
s_aspirate: int = 100,
s_dispense: int = 100,
):
"""
Mixes n times with volume vol
:param vol: Volume to aspirate/dispense in mixing. uL
:type vol: float
:param n_mix: Number of times to aspirate/dispense in mixing.
:type n_mix: int
:param location: The location from where to mix
:type location: Union[Well, Tuple, Location]
:param t_hold: Hold time at top and bottom of mix cycle to allow liquid to 'catch up'. Units seconds
:type t_hold: int
:param s: Speed at which to mix in uL/S. Best effort compliance.
:type s: int
"""
x, y, z = Labware._getxyz(location)
if type(location) == Well:
self.current_well = location
elif type(location) == Location:
self.current_well = location._labware
else:
pass
self._machine.safe_z_movement()
self._machine.move_to(x=x, y=y, wait=True)
self._aspirate(
500, s_aspirate
) # pre-aspirate 500 uL then blow this out at the end to avoid holding onto extra solution
self._machine.move_to(z=z, wait=True)
for _ in range(n_mix):
self._aspirate(vol, s_aspirate)
time.sleep(t_hold)
self._dispense(vol, s_dispense)
time.sleep(t_hold)
self._dispense(500, s_dispense)
[docs]
def set_pulsewidth(self, pulsewidth: int, s: int = 100):
"""
Manually move the servo actuator to a new location by setting the new pulsewidth.
Does not update volume, use carefully
:param pulsewidth: The servo motor pulsewidth to set the servo to. Read up on servo positioning for this to make sense. Ranges from 1000-2000 with a stricter range of accessible values for each syringe tool.
:type pulsewidth: int
:param s: Speed of movement in uL/S.
:type s: int
"""
assert pulsewidth > self.full_position
assert pulsewidth < self.empty_position
r = requests.post(
self.url + "/set_pulsewidth",
json={"pulsewidth": pulsewidth, "name": self.name, "speed": s},
)
status = self.status()
return