Source code for bgpranking.helper_global

#!/usr/bin/python
# -*- coding: utf-8 -*-

import datetime
import redis
from dateutil import parser

import constraints as c

__global_db = None
__history_db = None
__config_db = None
__history_db_cache = None

# Tunnel: ssh remote_instance -L 6379:remote_instance:6379

# Default responses
## NOK
__unknown_asn = {'return_code': 0, 'return_vebose': 'Unknown ASN'}
__no_ranks = {'return_code': 0,
            'return_vebose': 'No ranks in this timeframe.'}
__unknown_block = {'return_code': 0,
            'return_vebose': 'Unable to find this IP block.'}
## OK
__ranks = {'return_code': 1, 'return_vebose': 'Got ranks.'}
__owner = {'return_code': 1, 'return_vebose': 'Got owner name.'}

[docs]def get_all_weights(date = None): """ Get the weights for all the sources. :param date: Date of the information (default: last ranked day) :rtype: Dictionary .. note:: Format of the dictionary: .. code-block:: python { source: date, ... } """ if date is None: date = get_default_date() sources = daily_sources([date]) impacts = __config_db.mget(sources) return dict(zip(sources, impacts))
[docs]def daily_sources(dates): """ Get the sources parsed during a list of dates :param dates: List of dates :rtype: Set of sources for each date """ if len(dates) == 1: return __global_db.smembers('{date}|sources'.format(date = dates[0])) else: p = __global_db.pipeline(False) [p.smembers('{date}|sources'.format(date = date)) for date in dates] return p.execute()
[docs]def prepare_sources_by_dates(last_day = None, timeframe = None): """ Get a dictionary of sources by dates. :param last_day: Last day of the interval :param timeframe: size of the interval :rtype: Dictionary .. note:: Format of the dictionary: .. code-block:: python { date: [source1, source2, ...], ... } """ dates = __dates_interval(last_day, timeframe) sources = daily_sources(dates) if len(dates) == 1: return {dates[0]: sources} return dict(zip(dates, sources))
[docs]def get_sources_by_dates(sources, last_day = None, timeframe = None): """ Get a dictionary of sources by dates. Only with the sources passed in parameter IF they exist for the day. :param sources: List of sources :param last_day: Last day of the interval :param timeframe: size of the interval :rtype: Dictionary .. note:: Format of the dictionary: .. code-block:: python { date: [source1, source2, ...], ... } """ dates = __dates_interval(last_day, timeframe) p = __global_db.pipeline(False) for date in dates: [p.sismember('{date}|sources'.format(date = date), source) for source in sources] exists = p.execute() i = 0 to_return = {} for date in dates: if to_return.get(date) is None: to_return[date] = [] for source in sources: if exists[i]: to_return[date].append(source) i += 1 return to_return
[docs]def last_seen_sources(date_sources): """ Get the last time a source has been seen. :param dates_sources: Dictionaries of the dates and sources .. note:: Format of the dictionary: .. code-block:: python { YYYY-MM-DD: [source1, source2, ...], YYYY-MM-DD: [source1, source2, ...], ... } :rype: Dictionary .. note:: Format of the dictionary: .. code-block:: python { source: date, ... } """ last_seen = {} for date in sorted(date_sources.iterkeys(), reverse = True): for source in date_sources[date]: if last_seen.get(source) is None: last_seen[source] = date return last_seen
[docs]def asn_exists(asn): """ Check if the ASN exists in the database. """ return __global_db.exists(asn)
[docs]def get_default_date(): """ Get the latest ranked day. """ return __get_default_date_raw().isoformat()
def __prepare(): global __global_db global __history_db global __config_db global __history_db_cache __global_db = redis.Redis(port = c.redis_port, db = c.redis_db_global, host = c.redis_hostname) __history_db = redis.Redis(port = c.redis_port, db = c.redis_db_history, host = c.redis_hostname) __config_db = redis.Redis(port = c.redis_port, db = c.redis_db_config, host = c.redis_hostname) __history_db_cache = redis.Redis(port = c.redis_cached_port, db = c.redis_cached_db_history, host = c.redis_hostname) def __get_default_date_raw(): """ Get the default date displayed on the website. """ timestamp = __history_db.get('latest_ranking') delta = datetime.timedelta(days=1) if timestamp is not None: default_date_raw = parser.parse(timestamp.split()[0]).date() - delta else: default_date_raw = datetime.date.today() - delta return default_date_raw def __dates_interval(last_day = None, timeframe = None): if last_day is None: last_day = __get_default_date_raw() else: last_day = parser.parse(last_day).date() if timeframe is None: timeframe = c.default_timeframe return [(last_day - datetime.timedelta(days=i)).isoformat() for i in range(timeframe)]