Source code for science_jubilee.tools.WebCamera

import json
import os
import time
import webbrowser
from typing import Tuple, Union

import cv2
import matplotlib.pyplot as plt
import numpy as np
import requests

from science_jubilee.labware.Labware import Labware, Location, Well
from science_jubilee.tools.Tool import Tool, requires_active_tool


[docs] class Camera(Tool): """A class representation of a Raspberry Pi camera server client.""" def __init__( self, index, name, ip_address, port, video_endpoint, still_endpoint, image_folder, focus_height, light: bool = False, light_pin: int = None, ): """Initializes the Camera object. :param index: The tool index of the pipette on the machine :type index: int :param name: The name associated with the tool (e.g. 'p300_single') :type name: str :param ip_address: The IP address of the Raspberry Pi camera server :type ip_address: str :param port: The port for the camera http requests :type port: int :param video_endpoint: The endpoint for the video feed :type video_endpoint: str :param still_endpoint: The endpoint for the still image :type still_endpoint: str :param image_folder: The folder to save captured images to :type image_folder: str :param light: LED ring light associated with the camera, defaults to False :type light: bool, optional :param light_pin: The GPIO pin (defined in the `M950` command in the machine `config.g`) for the LED ring light, defaults to None :type light_pin: int, optional """ super().__init__( index, name, ip_address=ip_address, port=port, video_endpoint=video_endpoint, still_endpoint=still_endpoint, image_folder=image_folder, light=light, light_pin=light_pin, focus_height=focus_height, )
[docs] self.still_url = f"http://{self.ip_address}:{self.port}/{self.still_endpoint}"
[docs] self.video_url = f"http://{self.ip_address}:{self.port}/{self.video_endpoint}"
# TODO: Ping camera server and make sure that it is reachable @classmethod
[docs] def from_config( cls, index, name, config_file: str, path: str = os.path.join(os.path.dirname(__file__), "configs"), ): """Initialize the pipette object from a config file :param index: The tool index of the pipette on the machine :type index: int :param name: The name associated with the tool (e.g. 'WebCamera') :type name: str :param config_file: The name of the config file containing the tool parameters :type config_file: str :param path: The path to the labware configuration `.json` files for the labware, defaults to the 'labware_definition/' in the science_jubilee/labware directory. :type path: str, optional :return: the initialized :class:`Camera` object :rtype: :class:`Camera` object """ config = os.path.join(path, config_file) with open(config, "rt") as f: kwargs = json.load(f) return cls(index=index, name=name, **kwargs)
@requires_active_tool
[docs] def _capture_image(self, timeout=30): """Capture image from raspberry pi and write to file :param timeout: the timeout for the http request, defaults to 10 :type timeout: int, optional :return: the image as a bstring :rtype: bytes """ time.sleep(1) try: response = requests.get(self.still_url, timeout=timeout) except [ConnectionError, ConnectionRefusedError]: raise AssertionError time.sleep(2) assert response.status_code == 200 return response.content
@requires_active_tool
[docs] def capture_image( self, location: Union[Well, Tuple], light: bool = False, light_intensity: int = 0, timeout=30, ): """Capture an image from the WebCamera at the specified location :param location: the location of the well to capture an image of :type location: Union[Well, Tuple] :param light: Option to turn on a ring light before taking the image, defaults to False :type light: bool, optional :param light_intensity: Intensity of the led rign light, defaults to 0 :type light_intensity: int, optional :return: the image as an bstring :rtype: bytes """ assert 0 <= light_intensity <= 1, "Light intensity must be between 0 and 1" x, y, z = Labware._getxyz(location) self._machine.safe_z_movement() self._machine.move_to(x=x, y=y, wait=True) picture_heigth = self.focus_height - abs(self.tool_offset) self._machine.move_to(z=picture_heigth, wait=True) if light is True: self._machine.gcode(f"M42 P{self.light_pin} S{light_intensity}") image = self._capture_image(timeout=timeout) self._machine.gcode(f"M42 P{self.light_pin} S0") else: image = self._capture_image() return image
[docs] def video_feed(self): """Deploys a video feed at the url specified in the config file""" webbrowser.open(self.video_url)
[docs] def decode_image(self, image_bin): """Decode a bstring image into an np.array :param image_bin: the image as a bstring :type image_bin: bytes :return: the image as an np.array :rtype: np.array """ image_arr = np.frombuffer(image_bin, np.uint8) image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) image_rgb = image[:, :, [2, 1, 0]] return image_rgb
[docs] def process_image(self, image_bin, radius=50): """Externally callable function to run processing pipeline :param image_bin: the image as a bstring :type image_bin: bytes :param radius: the radius (in pixels) of the circular mask, defaults to 50 :type radius: int, optional :return: the average rgb values of the masked image :rtype: list """ image = self.decode_image(image_bin) r = radius masked_image = self._mask_image(image, r) t = time.time() cv2.imwrite(f"./sampleimage_full_{t}.jpg", image) cv2.imwrite(f"./sampleimage_masked_{t}.jpg", masked_image) rgb_values = self._get_rgb_avg(masked_image) return rgb_values
[docs] def _mask_image(self, image, radius=50): """Apply a circular mask to an image :param image: the image object :type image: np.array :param radius: the size (in pixels) of the circular mask, defaults to 50 :type radius: int, optional :return: the masked image :rtype: np.array """ image_shape = image.shape[:2] w = image_shape[0] // 2 h = image_shape[1] // 2 mask = np.zeros(image_shape, dtype="uint8") cv2.circle(mask, (w, w), radius, 255, -1) masked = cv2.bitwise_and(image, image, mask=mask) return masked
[docs] def _get_rgb_avg(self, image): """Extract the average rgb values from an image :param image: the image object :type image: np.array :return: the average rgb values in a list [R,G,B] :rtype: list """ bgr = [] for dim in [0, 1, 2]: flatdim = image[:, :, dim].flatten() indices = flatdim.nonzero()[0] value = flatdim.flatten()[indices].mean() bgr.append(value) # opencv uses bgr so convert to rgb for loss print("swapping") rgb = [bgr[i] for i in [2, 1, 0]] return rgb
[docs] def view_image(self, image_bin, masked=False, radius=50): """Show the image in a matplotlib window :param image_bin: the image as a bstring :type image_bin: bytes :param masked: Wether to mask the image or not, defaults to False :type masked: bool, optional :param radius: the size (in pixel) of the circular mask toapply to the image , defaults to 50 :type radius: int, optional """ image = self.decode_image(image_bin) if masked is True: image = self._mask_image(image, radius) else: pass fig, ax = plt.subplots(figsize=(3, 4)) plt.setp(plt.gca(), autoscale_on=True) ax.imshow(image)