Skip to content
Snippets Groups Projects
Commit ec7b211c authored by Bob Lantz's avatar Bob Lantz
Browse files

Buffered output. Added net.monitor() and node.readline()

Moved monitor() and readline() into net.py and node.py respectively,
which will hopefully be useful for monitoring large sets of hosts,
as is done in udpbwtest.py.

Changed iperf to use interactive command infrastructure (such as it
is), which may make it more reliable. Hopefully it's a bit clearer
as well, although it is slightly more complicated.
parent 6ab1f1ff
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
import os import os
import re import re
import select
import sys import sys
from time import time from time import time
...@@ -30,37 +29,6 @@ ...@@ -30,37 +29,6 @@
from mininet.topolib import TreeTopo from mininet.topolib import TreeTopo
from mininet.util import quietRun from mininet.util import quietRun
# Some useful stuff: buffered readline and host monitoring
def readline( host, buf ):
"Read a line from a host, buffering with buffer."
buf += host.read( 1024 )
if '\n' not in buf:
return None, buf
pos = buf.find( '\n' )
line = buf[ 0 : pos ]
rest = buf[ pos + 1: ]
return line, rest
def monitor( hosts, seconds ):
"Monitor a set of hosts and yield their output."
poller = select.poll()
Node = hosts[ 0 ] # so we can call class method fdToNode
buffers = {}
for host in hosts:
poller.register( host.stdout )
buffers[ host ] = ''
quitTime = time() + seconds
while time() < quitTime:
ready = poller.poll()
for fd, event in ready:
host = Node.fdToNode( fd )
if event & select.POLLIN:
line, buffers[ host ] = readline( host, buffers[ host ] )
if line:
yield host, line
yield None, ''
# bwtest support # bwtest support
def parsebwtest( line, def parsebwtest( line,
...@@ -111,8 +79,9 @@ def udpbwtest( net, seconds ): ...@@ -111,8 +79,9 @@ def udpbwtest( net, seconds ):
print print
results = {} results = {}
print "*** Monitoring hosts" print "*** Monitoring hosts"
output = monitor( hosts, seconds ) output = net.monitor( hosts )
while True: quitTime = time() + seconds
while time() < quitTime:
host, line = output.next() host, line = output.next()
if host is None: if host is None:
break break
......
...@@ -85,6 +85,7 @@ ...@@ -85,6 +85,7 @@
import os import os
import re import re
import select
import signal import signal
from time import sleep from time import sleep
...@@ -382,6 +383,25 @@ def run( self, test, **params ): ...@@ -382,6 +383,25 @@ def run( self, test, **params ):
self.stop() self.stop()
return result return result
def monitor( self, hosts=None ):
"""Monitor a set of hosts (or all hosts by default),
and return their output, a line at a time.
returns: host, line"""
if hosts is None:
hosts = self.hosts
poller = select.poll()
Node = hosts[ 0 ] # so we can call class method fdToNode
for host in hosts:
poller.register( host.stdout )
while True:
ready = poller.poll()
for fd, event in ready:
host = Node.fdToNode( fd )
if event & select.POLLIN:
line = host.readline()
if line:
yield host, line
@staticmethod @staticmethod
def _parsePing( pingOutput ): def _parsePing( pingOutput ):
"Parse ping output and return packets sent, received." "Parse ping output and return packets sent, received."
...@@ -460,10 +480,10 @@ def iperf( self, hosts=None, l4Type='TCP', udpBw='10M' ): ...@@ -460,10 +480,10 @@ def iperf( self, hosts=None, l4Type='TCP', udpBw='10M' ):
hosts = [ self.hosts[ 0 ], self.hosts[ -1 ] ] hosts = [ self.hosts[ 0 ], self.hosts[ -1 ] ]
else: else:
assert len( hosts ) == 2 assert len( hosts ) == 2
host0, host1 = hosts client, server = hosts
output( '*** Iperf: testing ' + l4Type + ' bandwidth between ' ) output( '*** Iperf: testing ' + l4Type + ' bandwidth between ' )
output( "%s and %s\n" % ( host0.name, host1.name ) ) output( "%s and %s\n" % ( client.name, server.name ) )
host0.cmd( 'killall -9 iperf' ) server.cmd( 'killall -9 iperf' )
iperfArgs = 'iperf ' iperfArgs = 'iperf '
bwArgs = '' bwArgs = ''
if l4Type == 'UDP': if l4Type == 'UDP':
...@@ -471,14 +491,17 @@ def iperf( self, hosts=None, l4Type='TCP', udpBw='10M' ): ...@@ -471,14 +491,17 @@ def iperf( self, hosts=None, l4Type='TCP', udpBw='10M' ):
bwArgs = '-b ' + udpBw + ' ' bwArgs = '-b ' + udpBw + ' '
elif l4Type != 'TCP': elif l4Type != 'TCP':
raise Exception( 'Unexpected l4 type: %s' % l4Type ) raise Exception( 'Unexpected l4 type: %s' % l4Type )
server = host0.cmd( iperfArgs + '-s &' ) server.sendCmd( iperfArgs + '-s', printPid=True )
debug( '%s\n' % server ) servout = ''
client = host1.cmd( iperfArgs + '-t 5 -c ' + host0.IP() + ' ' + while server.lastPid is None:
servout += server.monitor()
cliout = client.cmd( iperfArgs + '-t 5 -c ' + server.IP() + ' ' +
bwArgs ) bwArgs )
debug( '%s\n' % client ) debug( 'Client output: %s\n' % cliout )
server = host0.cmd( 'killall -9 iperf' ) server.sendInt()
debug( '%s\n' % server ) servout += server.waitOutput()
result = [ self._parseIperf( server ), self._parseIperf( client ) ] debug( 'Server output: %s\n' % servout )
result = [ self._parseIperf( servout ), self._parseIperf( cliout ) ]
if l4Type == 'UDP': if l4Type == 'UDP':
result.insert( 0, udpBw ) result.insert( 0, udpBw )
output( '*** Results: %s\n' % result ) output( '*** Results: %s\n' % result )
......
...@@ -94,11 +94,7 @@ def __init__( self, name, inNamespace=True, ...@@ -94,11 +94,7 @@ def __init__( self, name, inNamespace=True,
self.defaultMAC = defaultMAC self.defaultMAC = defaultMAC
self.lastCmd = None self.lastCmd = None
self.lastPid = None self.lastPid = None
# Grab PID self.readbuf = ''
self.waiting = True
while self.lastPid is None:
self.monitor()
self.pid = self.lastPid
self.waiting = False self.waiting = False
@classmethod @classmethod
...@@ -115,9 +111,30 @@ def cleanup( self ): ...@@ -115,9 +111,30 @@ def cleanup( self ):
# Subshell I/O, commands and control # Subshell I/O, commands and control
def read( self, bytes ): def read( self, bytes ):
"""Read from a node. """Buffered read from node, non-blocking.
bytes: maximum number of bytes to read""" bytes: maximum number of bytes to return"""
return os.read( self.stdout.fileno(), bytes ) count = len( self.readbuf )
if count < bytes:
data = os.read( self.stdout.fileno(), bytes - count )
self.readbuf += data
if bytes >= len( self.readbuf ):
result = self.readbuf
self.readbuf = ''
else:
result = self.readbuf[ :bytes ]
self.readbuf = self.readbuf[ bytes: ]
return result
def readline( self ):
"""Buffered readline from node, non-blocking.
returns: line (minus newline) or None"""
self.readbuf += self.read( 1024 )
if '\n' not in self.readbuf:
return None
pos = self.readbuf.find( '\n' )
line = self.readbuf[ 0 : pos ]
self.readbuf = self.readbuf[ pos + 1: ]
return line
def write( self, data ): def write( self, data ):
"""Write data to node. """Write data to node.
...@@ -135,7 +152,8 @@ def stop( self ): ...@@ -135,7 +152,8 @@ def stop( self ):
def waitReadable( self ): def waitReadable( self ):
"Wait until node's output is readable." "Wait until node's output is readable."
self.pollOut.poll() if len( self.readbuf ) == 0:
self.pollOut.poll()
def sendCmd( self, cmd, printPid=False ): def sendCmd( self, cmd, printPid=False ):
"""Send a command, followed by a command to echo a sentinel, """Send a command, followed by a command to echo a sentinel,
...@@ -155,10 +173,10 @@ def sendCmd( self, cmd, printPid=False ): ...@@ -155,10 +173,10 @@ def sendCmd( self, cmd, printPid=False ):
self.lastPid = None self.lastPid = None
self.waiting = True self.waiting = True
def sendInt( self ): def sendInt( self, sig=signal.SIGINT ):
"Interrupt running command." "Interrupt running command."
if self.lastPid: if self.lastPid:
os.kill( self.lastPid, signal.SIGINT ) os.kill( self.lastPid, sig )
def monitor( self ): def monitor( self ):
"""Monitor and return the output of a command. """Monitor and return the output of a command.
...@@ -315,7 +333,7 @@ def setHostRoute( self, ip, intf ): ...@@ -315,7 +333,7 @@ def setHostRoute( self, ip, intf ):
def setDefaultRoute( self, intf ): def setDefaultRoute( self, intf ):
"""Set the default route to go through intf. """Set the default route to go through intf.
intf: string, interface name""" intf: string, interface name"""
self.cmd( 'ip route flush' ) self.cmd( 'ip route flush root 0/0' )
return self.cmd( 'route add default ' + intf ) return self.cmd( 'route add default ' + intf )
def IP( self, intf=None ): def IP( self, intf=None ):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment