Skip to content
Snippets Groups Projects
Commit 5c18be2d authored by Emil Harlan's avatar Emil Harlan
Browse files

a

parent c6a47c20
No related branches found
No related tags found
No related merge requests found
import rclpy
from rclpy.node import Node
from rclpy.subscription import Subscription
from std_msgs.msg import String
from subprocess import Popen
from concurrent.futures import ThreadPoolExecutor
class LaunchNode(Node):
def __init__(self):
super().__init__("launch_node")
# Subscribe to the launch_node topic
self.subscription = self.create_subscription(
String, "launch_node", self.on_message, 10)
# Create a dictionary to store the launched processes
self.launched_processes = {}
# Create a thread pool
self.thread_pool = ThreadPoolExecutor(max_workers=10)
def on_message(self, msg):
# Get the launch command and name from the message
name = msg.name
command = msg.command
# If the launch command is "kill", send a SIGINT signal to the process with the given name
if command == "kill":
if name in self.launched_processes:
os.kill(self.launched_processes[name], signal.SIGINT)
del self.launched_processes[name]
else:
# Otherwise, launch the process with the given command
self.thread_pool.submit(self.launch_process, name, command)
def launch_process(self, name, command):
# Launch the process
process = Popen(command)
self.launched_processes[name] = process
def main():
rclpy.init()
node = LaunchNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -20,6 +20,7 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
"launch_node =robomaster_pi.launch_node:main"
"cmd_vel_combiner = robomaster_pi.cmd_vel_combiner:main",
"map_update = robomaster_pi.map_update:main",
"command_executer = robomaster_pi.command_executer:main",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment