Skip to content
Snippets Groups Projects
Commit 4fe46bef authored by Emil Harlan's avatar Emil Harlan
Browse files

a

parent f5d3cd30
No related branches found
No related tags found
No related merge requests found
......@@ -2,13 +2,11 @@ import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from subprocess import Popen, PIPE, STDOUT
from concurrent.futures import ThreadPoolExecutor
import threading
class LaunchNode(Node):
class LaunchControlNode(Node):
def __init__(self):
super().__init__("launch_node")
super().__init__("launch_control_node")
# Subscribe to the launch_node topic
self.subscription = self.create_subscription(
......@@ -16,7 +14,6 @@ class LaunchNode(Node):
# Create a dictionary to store the launched processes
self.launched_processes = {}
self.process_lock = threading.Lock() # Lock for thread safety
def on_message(self, msg):
# Get the launch command and name from the message
......@@ -27,51 +24,39 @@ class LaunchNode(Node):
if command == "kill":
self._terminate_process(name)
else:
# Otherwise, launch the process with the given command in a separate thread
thread = threading.Thread(target=self._launch_process, args=(name, command))
thread.start()
# Otherwise, launch the process with the given command
self._launch_process(name, command)
def _terminate_process(self, name):
with self.process_lock:
if name in self.launched_processes:
try:
process = self.launched_processes[name]
process.send_signal(Popen.SIGINT)
process.wait(timeout=5) # Wait for the process to terminate (timeout: 5 seconds)
del self.launched_processes[name]
self.get_logger().info(f"Process '{name}' has been terminated.")
except Exception as e:
self.get_logger().error(f"Error terminating process '{name}': {e}")
else:
self.get_logger().warn(f"Process '{name}' not found in launched_processes.")
def _launch_process(self, name, command):
with self.process_lock:
if name in self.launched_processes:
try:
# Launch the process
process = Popen(command, shell=True, stdout=PIPE, stderr=STDOUT)
self.launched_processes[name] = process
self.get_logger().info(f"Process '{name}' has been launched.")
# Read and log the output of the process
for line in process.stdout:
self.get_logger().info(f"Process '{name}' output: {line.decode().strip()}")
process = self.launched_processes[name]
process.send_signal(Popen.SIGINT)
process.wait(timeout=5) # Wait for the process to terminate (timeout: 5 seconds)
del self.launched_processes[name]
self.get_logger().info(f"Process '{name}' has been terminated.")
except Exception as e:
self.get_logger().error(f"Error launching process '{name}': {e}")
self.get_logger().error(f"Error terminating process '{name}': {e}")
else:
self.get_logger().warn(f"Process '{name}' not found in launched_processes.")
def _launch_process(self, name, command):
try:
# Launch the process
process = Popen(command, shell=True, stdout=PIPE, stderr=STDOUT)
self.launched_processes[name] = process
self.get_logger().info(f"Process '{name}' has been launched.")
# Read and log the output of the process
for line in process.stdout:
self.get_logger().info(f"Process '{name}' output: {line.decode().strip()}")
except Exception as e:
self.get_logger().error(f"Error launching process '{name}': {e}")
def main():
rclpy.init()
node = LaunchNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
# Terminate all running processes before shutting down
for name, process in node.launched_processes.items():
node._terminate_process(name)
node.destroy_node()
rclpy.shutdown()
node = LaunchControlNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment