Source code for znvis.visualizer.visualizer

"""
ZnVis: A Zincwarecode package.
License
-------
This program and the accompanying materials are made available under the terms
of the Eclipse Public License v2.0 which accompanies this distribution, and is
available at https://www.eclipse.org/legal/epl-v20.html
SPDX-License-Identifier: EPL-2.0
Copyright Contributors to the Zincwarecode Project.
Contact Information
-------------------
email: zincwarecode@gmail.com
github: https://github.com/zincware
web: https://zincwarecode.com/
Citation
--------
If you use this module please cite us with:

Summary
-------
Main visualizer class.
"""
import pathlib
import re
import shutil
import threading
import time
import typing

import cv2
import open3d as o3d
import open3d.visualization.gui as gui
from rich.progress import Progress, track

import znvis


[docs]class Visualizer: """ Main class to perform visualization. Attributes ---------- particles : list[znvis.Particle] A list of particle objects to add to the visualizer. app : o3d.gui.Application.instance An open3d application. vis : o3d.visualization.O3DVisualizer An open3d visualizer window counter : int Internally stored counter to track which configuration is currently being viewed. """ def __init__( self, particles: typing.List[znvis.Particle], output_folder: typing.Union[str, pathlib.Path] = "./", frame_rate: int = 24, number_of_steps: int = None, keep_frames: bool = False, bounding_box: znvis.BoundingBox = None, video_format: str = "mp4", ): """ Constructor for the visualizer. Parameters ---------- particles : list[znvis.Particle] List of particles to add to the visualizer. frame_rate : int Frame rate for the visualizer measured in frames per second (fps) number_of_steps : int Number of steps in the visualization. If None, the zeroth order of one particle is taken. This is left as an option in case the user wishes to overlay two particle trajectories of different length. keep_frames : bool If True, the visualizer will keep all frames after combining them into a video. video_format : str The format of the video to be generated. """ self.particles = particles self.frame_rate = frame_rate self.bounding_box = bounding_box() if bounding_box else None if number_of_steps is None: number_of_steps = particles[0].position.shape[0] self.number_of_steps = number_of_steps self.output_folder = pathlib.Path(output_folder).resolve() self.frame_folder = self.output_folder / "video_frames" self.video_format = video_format self.keep_frames = keep_frames self.obj_folder = self.output_folder / "obj_files" # Added later during run self.app = None self.vis = None self.counter = 0 def _initialize_app(self): """ Initialize the app object. Returns ------- Updates the app and vis class attributes. """ self.app = gui.Application.instance self.app.initialize() self.vis = o3d.visualization.O3DVisualizer("ZnVis Visualizer", 1920, 1080) self.vis.show_settings = True self.vis.reset_camera_to_default() # Add actions to the visualizer. self.vis.add_action("Step", self._update_particles) self.vis.add_action("Play", self._continuous_trajectory) self.vis.add_action("Export Scene", self._export_scene) self.vis.add_action("Screenshot", self._take_screenshot) self.vis.add_action("Export Video", self._export_video) self.vis.add_action("Export Mesh Trajectory", self._export_mesh_trajectory) # Add the visualizer to the app. self.app.add_window(self.vis) self.interrupt: int = 0 # 0 = Not running, 1 = running def _pause_run(self, vis): """ Pause a live visualization run. Returns ------- Set self.interrupt = 1 """ self.interrupt = 0 def _export_video(self, vis): """ Export a video of the simulation. Parameters ---------- vis : Visualizer The active visualizer. Returns ------- Saves a video locally. """ self.interrupt = 0 # stop live feed if running. # Create temporary directory self.frame_folder.mkdir(parents=True, exist_ok=True) # Write all PNG files to directory t = threading.Thread(target=self._record_trajectory) t.start() def _export_mesh_trajectory(self, vis): """ Export a video of the simulation. Parameters ---------- vis : Visualizer The active visualizer. Returns ------- Saves a video locally. """ self.interrupt = 0 # stop live feed if running. # Create temporary directory self.obj_folder.mkdir(parents=True, exist_ok=True) # Write all PNG files to directory t = threading.Thread(target=self._record_mesh_trajectory) t.start() def _create_movie(self): """ Concatenate images into a movie. This needs to be a seperate method so that the image storing thread can run to completion before this one is called. (GIL stuff) """ images = [f.as_posix() for f in self.frame_folder.glob("*.png")] # Sort images by number images = sorted(images, key=lambda s: int(re.search(r"\d+", s).group())) single_frame = cv2.imread(images[0]) height, width, layers = single_frame.shape video = cv2.VideoWriter( (self.output_folder / f"ZnVis-Video.{self.video_format}").as_posix(), 0, self.frame_rate, (width, height), ) for image in track(images, description="Exporting Video..."): video.write(cv2.imread(image)) cv2.destroyAllWindows() video.release() # Delete temporary directory if not storing run files if not self.keep_frames: shutil.rmtree(self.frame_folder, ignore_errors=False) def _export_scene(self, vis): """ Export the current visualization scene. Parametersor texture in ("albedo", "normal", "ao", "metallic", "roughness"): ---------- vis : Visualizer The active visualizer. Returns ------- Stores a .ply model locally. """ old_state = self.interrupt # get old state self.interrupt = 0 # stop live feed if running. for i, item in enumerate(self.particles): if i == 0: mesh = item.mesh_list[self.counter] else: mesh += item.mesh_list[self.counter] o3d.io.write_triangle_mesh( (self.output_folder / f"My_mesh_{self.counter}.obj").as_posix(), mesh ) # Restart live feed if it was running before the export. if old_state == 1: self._continuous_trajectory(vis) def _take_screenshot(self, vis): """ Take a screenshot Parameters ---------- vis : Visualizer The activate visualizer. Returns ------- Takes a screenshot and dumps it """ vis.export_current_image( (self.output_folder / f"screenshot_{self.counter}.png").as_posix() ) def _initialize_particles(self): """ Initialize the particles in the simulation. This method will construct the particle dictionaries in each Particle class and then add the first location of each particle to the visualizer window. """ # Build the mesh dict for each particle and add them to the window. for item in self.particles: item.construct_mesh_list() self._draw_particles(initial=True) def _draw_particles(self, visualizer=None, initial: bool = False): """ Draw the particles on the visualizer. Parameters ---------- initial : bool (default = True) If true, no particles are removed. Returns ------- updates the information in the visualizer. Notes ----- TODO: Use of initial is a dirty fix. It can be removed when support for transforming multiple geometry objects is added to open3d. """ # Check if a visualizer was passed. if visualizer is None: visualizer = self.vis # Add the particles to the visualizer. if initial: for i, item in enumerate(self.particles): visualizer.add_geometry( item.name, item.mesh_list[self.counter], item.mesh.o3d_material ) # check for bounding box if self.bounding_box is not None: visualizer.add_geometry("Box", self.bounding_box) else: for i, item in enumerate(self.particles): visualizer.remove_geometry(item.name) visualizer.add_geometry( item.name, item.mesh_list[self.counter], item.mesh.o3d_material ) def _continuous_trajectory(self, vis): """ Button command for running the simulation in the visualizer. Parameters ---------- vis : visualizer Object passed during the callback. """ if self.interrupt == 1: self._pause_run(vis) else: threading.Thread(target=self._run_trajectory).start() def _record_trajectory(self): """ Record the trajectory. """ self.update_thread_finished = True self.save_thread_finished = True def update_callable(): """ Function to be called on thread to update positions. """ self._update_particles() self.update_thread_finished = True def save_callable(): """ Function to be called on thread to save image. """ self.vis.export_current_image( (self.frame_folder / f"frame_{self.counter:0>6}.png").as_posix() ) self.save_thread_finished = True with Progress() as progress: task = progress.add_task("Saving scenes...", total=self.number_of_steps) # while self.counter < (self.number_of_steps - 1): while not progress.finished: time.sleep(1 / self.frame_rate) if self.save_thread_finished and self.update_thread_finished: self.save_thread_finished = False o3d.visualization.gui.Application.instance.post_to_main_thread( self.vis, save_callable ) progress.update(task, advance=1) if self.update_thread_finished: self.update_thread_finished = False o3d.visualization.gui.Application.instance.post_to_main_thread( self.vis, update_callable ) time.sleep(1) # Ensure the last image is saved self._create_movie() def _record_mesh_trajectory(self): """ Export the trajectory as mesh files. """ self.update_thread_finished = True self.save_thread_finished = True def update_callable(): """ Function to be called on thread to update positions. """ self._update_particles() self.update_thread_finished = True def save_callable(): """ Function to be called on thread to save image. """ for i, item in enumerate(self.particles): if i == 0: mesh = item.mesh_list[self.counter] else: mesh += item.mesh_list[self.counter] o3d.io.write_triangle_mesh( (self.obj_folder / f"export_mesh_{self.counter}.obj").as_posix(), mesh ) self.save_thread_finished = True with Progress() as progress: task = progress.add_task("Saving scenes...", total=self.number_of_steps) # while self.counter < (self.number_of_steps - 1): while not progress.finished: time.sleep(1 / self.frame_rate) if self.save_thread_finished and self.update_thread_finished: self.save_thread_finished = False o3d.visualization.gui.Application.instance.post_to_main_thread( self.vis, save_callable ) progress.update(task, advance=1) if self.update_thread_finished: self.update_thread_finished = False o3d.visualization.gui.Application.instance.post_to_main_thread( self.vis, update_callable ) time.sleep(1) # Ensure the last image is saved def _run_trajectory(self): """ Callback method for running the trajectory smoothly. Returns ------- Runs through the trajectory. """ self.interrupt = 1 # set global run state. while self.counter < self.number_of_steps: time.sleep(1 / self.frame_rate) o3d.visualization.gui.Application.instance.post_to_main_thread( self.vis, self._update_particles ) # Break if interrupted. if self.interrupt == 0: break self.interrupt = 0 # reset global state. def _update_particles(self, visualizer=None, step: int = None): """ Update the positions of the particles. Parameters ---------- step : int Step to update to. Returns ------- Updates the positions of the particles in the box. """ if visualizer is None: visualizer = self.vis if step is None: if self.counter == self.number_of_steps - 1: self.counter = 0 else: self.counter += 1 step = self.counter self._draw_particles(visualizer=visualizer) # draw the particles. visualizer.post_redraw() # re-draw the window.
[docs] def run_visualization(self): """ Run the visualization. Returns ------- Launches the visualization. """ self._initialize_app() self._initialize_particles() self.vis.reset_camera_to_default() self.app.run()