Commit 30151a93 authored by David Pradas's avatar David Pradas

Merge branch 'dev_standalone_collector' into 'dev'

Dev standalone collector

See merge request opensand/opensand!37
parents 6ab05ab7 dbec3073
Copyright:
Thales Alenia Space
www.thalesaleniaspace.com
Contributors:
The following persons participated in the OpenSAND Output development:
Vincent Duvert Viveris Technologies vduvert@toulouse.viveris.fr
Julien Bernard Viveris Technologies jbernard@toulouse.viveris.com
Bénédicte Motto Viveris technologies bmotto@toulouse.viveris.com
This diff is collapsed.
../ChangeLog
\ No newline at end of file
Build the collector:
$ python setup.py build
Install the collector
$ python setup.py install
See the installation manual on OpenSAND website (www.opensand.org) for more details
include AUTHORS
include ChangeLog
include COPYING
include INSTALL
include NEWS
graft conf
graft init
Introduction
------------
This piece of software is a collector for the satellite emulation
testbed OpenSAND.
It receives, stores and transmits events, errors and statistics.
License
-------
see COPYING
Program
-------
See the INSTALL file to learn how to install the collector.
References
----------
/var/log/opensand/collector.log {
weekly
rotate 4
missingok
notifempty
compress
}
:programname, contains, "sand-collector" -/var/log/opensand/collector.log
#!/bin/bash
### BEGIN INIT INFO
# Provides: opensand-collector
# Required-Start: $remote_fs $syslog
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Startup script for the OpenSAND collector
# Description: This script starts a OpenSAND service and a server
# that listen for OpenSAND manager requests
### END INIT INFO
# -*- coding: utf-8 -*-
# Debian init.d script for OpenSAND Collector
# Written by Julien BERNARD <jbernard@toulouse.viveris.com>
DESCR="the OpenSAND Collector"
SCRIPT_NAME="$0"
PROGNAME="sand-collector"
BIN="/usr/bin/sand-collector"
PIDFILE="/var/run/sand-collector/pid"
USER="opensand"
OPTIONS="-q -b"
IFACE=""
TYPE="_opensand._tcp"
. /lib/lsb/init-functions
# Source debconf library.
. /usr/share/debconf/confmodule
get_conf()
{
db_get opensand-collector/service/type
TYPE=$RET
db_get opensand-collector/service/interface
IFACE=$RET
}
# Start the server
start()
{
# read the configuration
get_conf
log_daemon_msg "Starting ${DESCR}" "${PROGNAME}"
# Check that executable is present
if [ ! -x ${BIN} ] ; then
log_failure_msg "sand-collector binary '${BIN}' not found on system"
log_end_msg 1
exit 1
fi
# Check that the configuration file is present
if [ ! -r ${CONF} ] ; then
log_failure_msg "${DESCR} config file '${CONF}' was not found on system"
log_end_msg 1
exit 1
fi
# Create the PID directory
mkdir -p /var/run/sand-collector
chown ${USER} /var/run/sand-collector
# Start the OpenSAND collector server
start-stop-daemon --start --background --quiet --pidfile ${PIDFILE} --chuid ${USER} --user ${USER} --exec ${BIN} -- ${OPTIONS} -i ${IFACE} -t ${TYPE} 1>/dev/null 2>&1
if [ $? -ne 0 ] ; then
log_failure_msg "failed to start ${DESCR}"
log_end_msg 1
exit 1
fi
# everything went fine
log_end_msg 0
touch /var/lock/${PROGNAME}
}
# Stop the OpenSAND collector server
stop()
{
log_daemon_msg "Shutting down ${DESCR}" "${PROGNAME}"
if [ -f "$PIDFILE" ] ; then
read PID < ${PIDFILE}
killproc -p ${PIDFILE} ${PROGNAME}
if [ $? -ne 0 ] ; then
log_failure_msg "failed to stop ${DESCR}"
log_end_msg 1
exit 1
fi
else
log_failure_msg "${PIDFILE} does not exist"
log_end_msg 1
# exit 1
return
fi
# Wait for the collector to stop
for try in $(seq 1 5) ; do
NB=$( ps -A -o "%p" | grep -c ${PID} )
[ ${NB} -eq 0 ] && break
sleep 1
done
# Be less gentle if the collector is still running
if [ ${NB} -ne 0 ] ; then
# We failed to stop the collector normally
log_failure_msg "failed to stop ${DESCR}"
log_end_msg 1
# Force the collector to stop
echo -n $"Force shutting down ${DESCR}: "
kill -9 ${PID} >/dev/null 2>&1
# Wait for the collector to stop
for try in $(seq 1 5) ; do
NB=$( ps -A -o "%p" | grep -c ${PID} )
[ ${NB} -eq 0 ] && break
sleep 1
done
if [ ${NB} -ne 0 ] ; then
log_failure_msg "${DESCR} was impossible to stop"
log_end_msg 1
exit 1
fi
rm -f ${PIDFILE}
fi
log_end_msg 0
rm -f /var/lock/${PROGNAME}
}
# Stop then start the server
restart()
{
stop
start || exit 1
}
# Which action to perform?
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
restart
;;
condrestart)
[ -e /var/lock/${PROGNAME} ] && restart
;;
status)
status_of_proc -p ${PIDFILE} ${BIN} ${PROGNAME}
;;
*)
echo "Usage $0 {start|stop|restart|condrestart|status}"
exit 1
esac
exit 0
# Systemd service file for OpenSAND Collector
# Written by Joaquin MUGUERZA <jmuguerza@toulouse.viveris.com>
[Unit]
Description=Startup script for the OpenSAND daemon
After=remote-fs.target
After=systemd-journald-dev-log.socket
[Service]
Type=forking
ExecStart=/bin/bash -c " SAND_COLLECTOR_TYPE=`echo 'get opensand-collector/service/type' | debconf-communicate | awk '{print $2}'` && \
SAND_COLLECTOR_IFACE=`echo 'get opensand-collector/service/interface' | debconf-communicate | awk '{print $2}'` && \
/usr/bin/sand-collector -i $SAND_COLLECTOR_IFACE -t \"$SAND_COLLECTOR_TYPE\" & "
ExecStartPre=/bin/bash -c "mkdir -p /var/run/sand-collector && chown opensand /var/run/sand-collector"
[Install]
WantedBy=multi-user.target
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
#
# OpenSAND is an emulation testbed aiming to represent in a cost effective way a
# satellite telecommunication system for research and engineering activities.
#
#
# Copyright © 2019 TAS
#
#
# This file is part of the OpenSAND testbed.
#
#
# OpenSAND is free software : you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY, without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see http://www.gnu.org/licenses/.
#
#
# Author: Vincent Duvert / Viveris Technologies <vduvert@toulouse.viveris.com>
# Author: Joaquin MUGUERZA <jmuguerza@toulouse.viveris.com>
"""
__init__.py - Main file for the OpenSAND collector.
"""
from opensand_collector.messages_handler import MessagesHandler
from opensand_collector.manager import HostManager
from opensand_collector.service_handler import ServiceHandler
from opensand_collector.transfer_server import TransferServer
from opensand_collector.syslog_handler import SysLogHandler, syslog
from optparse import OptionParser
import errno
import fcntl
import logging
import os
import signal
import sys
LOGGER = logging.getLogger('sand-collector')
DEFAULT_SERVICE_TYPE = '_opensand._tcp'
def fail(message, *args):
"""
Report a startup error and exits.
"""
LOGGER.error(message % args)
sys.exit(1)
def read_pid_file(path):
"""
Returns the current content of the PID file, or 0 if it does not exist.
"""
try:
with open(path) as pid_file:
try:
return int(pid_file.read())
except ValueError:
return 0
except IOError:
return 0
def remove_pid(path):
"""
Remove the pid file
"""
try:
os.remove(path)
except OSError, msg:
LOGGER.warning("cannot remove pid file")
pass
class OpenSandCollector(object):
"""
This class serves as the entry point for the collector daemon.
"""
def __init__(self):
self._host_manager = HostManager()
def run(self):
"""
Start the collector
"""
parser = OptionParser()
parser.set_defaults(debug=False, background=False, kill=False)
parser.add_option("-t", "--platform-id", dest="platform_id",
default='', action="store",
help="OpenSAND platform name")
parser.add_option("-i", "--iface", dest="iface",
default='', action="store",
help="Interface for service publishing (default: all)")
parser.add_option("-v", "--verbose", action="store_true",
dest="verbose", default=False,
help = "Print more informations")
parser.add_option("-d", "--debug", action="store_true", dest="debug",
default=False, help="Show debug messages")
parser.add_option("-q", "--quiet", action="store_true", dest="quiet",
default=False, help="Stop printing logs in console")
parser.add_option("-b", "--background", action="store_true",
dest="background",
help="Run in background as opensand user")
parser.add_option("-p", "--pid", dest="pid",
default='/var/run/sand-collector/pid',
action="store",
help="Specify the file to save sand-collector PID")
parser.add_option("-k", "--kill", action="store_true", dest="kill",
help="Kill a background collector instance")
(options, _args) = parser.parse_args()
platform_id = options.platform_id
iface = options.iface
pid_path = options.pid
# Service type
service_type = (
'_%s%s' % (platform_id, DEFAULT_SERVICE_TYPE) if platform_id
else DEFAULT_SERVICE_TYPE
)
# Logging configuration
if options.background or options.quiet:
log_handler = SysLogHandler('sand-collector', syslog.LOG_PID,
syslog.LOG_DAEMON)
LOGGER.addHandler(log_handler)
# Print logs in terminal for debug
if not options.quiet:
log_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)-5s "
"- %(message)-50s [%(filename)s:%(lineno)d]")
log_handler.setFormatter(formatter)
LOGGER.addHandler(log_handler)
LOGGER.setLevel(logging.WARNING)
if options.debug:
LOGGER.setLevel(logging.DEBUG)
elif options.verbose:
LOGGER.setLevel(logging.INFO)
if options.kill:
pid = read_pid_file(pid_path)
if pid == 0:
fail("The collector does not seem to be running (no PID).")
try:
os.kill(pid, signal.SIGTERM)
except OSError, msg:
remove_pid(pid_path)
fail("Cannot kill sand-collector: " + str(msg))
os._exit(0)
if options.background:
try:
if os.path.exists(pid_path):
fail("pid already exists")
bg_fd = os.open(pid_path, os.O_WRONLY | os.O_CREAT)
fcntl.flock(bg_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError, err:
fail(str(err))
except IOError, err:
if err.errno == errno.EACCES or err.errno == errno.EAGAIN:
fail("The collector seem to be already running in the "
"background.")
else:
fail(str(err))
pid = os.fork()
if pid:
os.write(bg_fd, str(pid))
os._exit(0)
null = open(os.path.devnull, 'r+')
sys.stdin = sys.stdout = sys.stderr = null
try:
with MessagesHandler(self._host_manager) as msg_handler:
port = msg_handler.get_port()
with TransferServer(self._host_manager) as transfer_server:
trsfer_port = transfer_server.get_port()
with ServiceHandler(self._host_manager, port, trsfer_port,
service_type, iface) as service:
def handler(_sig, _frame):
"""
SIGTERM handler
"""
logging.info("SIGTERM caught, quitting.")
service.stop()
signal.signal(signal.SIGTERM, handler)
service.run()
finally:
self._host_manager.cleanup()
if options.background:
remove_pid(pid_path)
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
#
# OpenSAND is an emulation testbed aiming to represent in a cost effective way a
# satellite telecommunication system for research and engineering activities.
#
#
# Copyright © 2019 TAS
#
#
# This file is part of the OpenSAND testbed.
#
#
# OpenSAND is free software : you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY, without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see http://www.gnu.org/licenses/.
#
#
# Author: Vincent Duvert / Viveris Technologies <vduvert@toulouse.viveris.com>
"""
__main__.py - This module can be used to start the collector without installing
it into the system.
This can be done with the command "python -m opensand_collector" (or
"python -m opensand_collector.__main__" for Python < 2.7)
"""
from opensand_collector import OpenSandCollector
if __name__ == '__main__':
OpenSandCollector().run()
This diff is collapsed.
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
#
# OpenSAND is an emulation testbed aiming to represent in a cost effective way a
# satellite telecommunication system for research and engineering activities.
#
#
# Copyright © 2019 TAS
#
#
# This file is part of the OpenSAND testbed.
#
#
# OpenSAND is free software : you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY, without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see http://www.gnu.org/licenses/.
#
#
# Author: Vincent Duvert / Viveris Technologies <vduvert@toulouse.viveris.com>
"""
service_handler.py - OpenSAND collector Avahi service handler.
"""
from dbus.mainloop.glib import DBusGMainLoop
from dbus.exceptions import DBusException
import gobject
import avahi
import dbus
import logging
LOGGER = logging.getLogger('sand-collector')
def on_error(self, *args):
""" error handler """
if len(args) == 0:
return
LOGGER.error('service error handler: ' + str(args[0]))
class ServiceHandler(object):
"""
Avahi service handler to publish the collector listening port (so the
daemons can find it) and find the OpenSAND daemons and their IPs.
"""
def __init__(self, host_manager, listen_port, transfer_port,
service_type, iface):
self._host_manager = host_manager
self._listen_port = listen_port
self._transfer_port = transfer_port
self._service_type = service_type
self._iface = iface
self._pub_group = None
self._disco_server = None
self._known_hosts = set()
def __enter__(self):
"""
Publish the service and set up service discovery
"""
bus = dbus.SystemBus(mainloop=DBusGMainLoop())
# Publishing
pub_server = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
self._pub_group = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
pub_server.EntryGroupNew()), avahi.DBUS_INTERFACE_ENTRY_GROUP)
additional_data = ["transfer_port=%d" % self._transfer_port]
if self._iface != '':
try:
iface = pub_server.GetNetworkInterfaceIndexByName(self._iface)
except DBusException:
LOGGER.warning("Cannot publish Avahi service on %s iface",
self._iface)
iface = avahi.IF_UNSPEC
else:
iface = avahi.IF_UNSPEC
try:
self._pub_group.AddService(iface, avahi.PROTO_INET, dbus.UInt32(0),
"collector", self._service_type, "", "",
dbus.UInt16(self._listen_port),
additional_data)
except dbus.exceptions.DBusException as error:
LOGGER.error("cannot add Avahi service (%s)" % error)
raise KeyboardInterrupt
self._pub_group.Commit()
self._mainloop = gobject.MainLoop()
gobject.threads_init() # Necessary for the transfer_server thread
# Discovery
self._disco_server = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
'/'), 'org.freedesktop.Avahi.Server')
disco_browser = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
self._disco_server.ServiceBrowserNew(iface,
avahi.PROTO_INET, self._service_type, 'local',
dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
disco_browser.connect_to_signal("ItemNew", self._handle_new)
disco_browser.connect_to_signal("ItemRemove", self._handle_remove)
LOGGER.info("Avahi service handler started.")
return self
def run(self):
""" start the mainloop for service listening """
self._mainloop.run()
def _handle_new(self, interface, protocol, name, stype, domain, flags):
"""
Handles a newly detected service
"""
if name == "collector":
return
def error_handler(*_args):
"""
Error handler for service resolution errors
"""
LOGGER.error("Unable to get resolve service %s, ignoring.", name)
return
self._disco_server.ResolveService(interface, protocol, name, stype,
domain, avahi.PROTO_INET, dbus.UInt32(0),
reply_handler=self._handle_resolve, error_handler=error_handler)
def _handle_resolve(self, *args):
"""
Called when a detected service is resolved
"""
name = args[2]
addr = args[7]
txt = args[9]
try:
items = dict("".join(chr(i) for i in arg).split("=", 1)
for arg in txt)
port = int(items.get('ext_port', ""))
except ValueError:
LOGGER.error("Failed to get UDP port from '%s' daemon.", name)
return
if name in self._known_hosts:
self._host_manager.add_host_addr(name, (addr, port))
return
LOGGER.info("Daemon on host '%s' has address %s:%d.", name, addr, port)
self._known_hosts.add(name)
self._host_manager.add_host(name, (addr, port))
def _handle_remove(self, _interface, _proto, name, _stype, _domain, _flags):