# -------------------------------------------------------------------
# - NAME: utils.py
# - AUTHOR: Reto Stauffer
# - DATE: 2014-09-13
# -------------------------------------------------------------------
# - DESCRIPTION: Some helper functions.
# -------------------------------------------------------------------
# - EDITORIAL: 2014-09-13, RS: Created file on thinkreto.
# 2014-12-31, RS: Inputs str('None') will be
# converted to None.
# 2015-08-05, RS: Moved inputcheck into utils.
# -------------------------------------------------------------------
# - L@ST MODIFIED: 2018-01-24 12:12 on marvin
# -------------------------------------------------------------------
"""
Documentation for this module.
More details should be added.
"""
import numpy as np
# -------------------------------------------------------------------
# - Prevent the script to execute some of the routines on certain
# tournament days.
# -------------------------------------------------------------------
[docs]def datelock(config,tdate):
"""To prevent the scripts to re-compute certain things, e.g.
the mean bet tips in the archive, this small function is used.
Problem: we do not know who was in which group in the past, wherefore
the mean bets will get wrong if recomputed.
Args:
config (:obj:`dict`): The config list from :meth:`utils.readconfig`.
tdate (:obj:`int`): Date as integer, days sincd 1970-01-01.
Return:
bool: Returns True if you are allowed to execute the
comoputation and false otherwise."""
return eval("{0:d} {1:s}".format(tdate,config['datelock']))
# -------------------------------------------------------------------
# - Parsing input arguments
# -------------------------------------------------------------------
# -------------------------------------------------------------------
# - Show the usage and exit.
# -------------------------------------------------------------------
def usage(what=None):
"""Script usage.
Args:
what (:obj:`str`): String to specify which usage should be used.
.. todo:: A bug! Change iputcheck, add propper usage.
"""
import utils, sys, getobs, database
config = readconfig('config.conf')
db = database.database(config)
cities = db.get_cities()
IDs, names, hashes = [],[],[]
for i in list(range(len(cities))):
IDs.append(cities[i]['ID'])
names.append(cities[i]['name'])
hashes.append(cities[i]['hash'])
if what == None:
print """
Run into the usage from the inputcheck module with None type.
You should set an explcit 'what' type when calling the usage
so that I can give you a propper exit statement and some
explanation which input options are allowed.
"""
else:
print """
Sorry, wrong usage for type %s.
Allowed inputs for this script are:
-u/--user: A userID or a user_login name. Most
script accept this and compute the points
or whatever it is for this user only.
-s/--users: A list of user names, seperated by commas, no spaces!
-c/--city: City can be given by its ID, name or hash
IDs: %s
names: %s
hashes: %s
-a/--alldates Cannot be combined with the -t/--tdate option.
If set loop over all available dates.
-t/--tdate: Tournament date in days since 1970-01-01
-p/--param: A list of paramIDs.
-n/--filename A filename for exporting tables/data.
-d/--dates: A range of dates seperated by ","
-a/--alldates: ignores -t input. Takes all tournament dates
from the database to compute the points.
-f/--force: Please DO NOT USE. This was for testing
purpuses to bypass some securety features
of the scripts!!!! But these securety
features are there for some reason. So
please do not use.
""" % (what, IDs, names, hashes)
utils.exit('This is/was the usage (what: %s).' % what)
# -------------------------------------------------------------------
# - Reading configuration file
# -------------------------------------------------------------------
[docs]def readconfig(file='config.conf',inputs=None,conversion_table=None):
"""Reading config file. There is a 'global' wetterturnier backend
config file which is necessary to handle all the actions.
This method also checks some parameters. E.g., if a required directory or
file does not exist, the script stops.
Args:
file (:obj:`str`): string File name of the config file. Default is ``config.conf``.
inputs (:obj:`dict`): Usually the input dict from :meth:`utils.inputcheck`.
Default is None. If it is a dict: all parameters will be added to
the config dict which will be generated in this method.
In case a key exists in the config dict (created in here) and is duplicated
in the inputs dict the script will stop immediately.
Returns:
dict: A dict containing all necessary configs.
"""
import sys, os
import utils
if not os.path.isfile(file):
utils.exit('Cannot read file %s. Not readable or not existing' % file)
# - Import ConfigParser
from ConfigParser import ConfigParser
CNF = ConfigParser()
CNF.read(file)
# - Checks if directory exists.
def check_directory( name ):
if not os.path.isdir( name ):
utils.exit('Directory %s does not exist as requried by config file.' % name)
# - Checks if file exists.
def check_file( name ):
if not os.path.isfile( name ):
utils.exit('File %s does not exist as requried by config file.' % name)
# ----------------------------------------------------------------
# - Reading mysql config
config = {}
config['conversion_table'] = conversion_table
try:
config['mysql_host'] = CNF.get('database','mysql_host')
config['mysql_user'] = CNF.get('database','mysql_user')
config['mysql_pass'] = CNF.get('database','mysql_pass')
config['mysql_db'] = CNF.get('database','mysql_db')
config['mysql_prefix'] = CNF.get('database','mysql_prefix')
config['mysql_obstable'] = CNF.get('database','mysql_obstable')
except:
utils.exit('Problems reading the database config from the config file %s' % file)
# ----------------------------------------------------------------
# - Reading migration flags
try:
config['migrate_groups'] = CNF.getboolean('migrate','groups')
config['migrate_mitspieler'] = CNF.getboolean('migrate','mitspieler')
config['migrate_mitspielerfile'] = CNF.get('migrate','mitspielerfile')
config['migrate_wpconfig'] = CNF.get('migrate','wpconfig')
tmp = CNF.get('migrate','citytags')
config['migrate_citytags'] = []
for elem in tmp.split(','): config['migrate_citytags'].append(elem.strip())
except:
config['migrate_mitspieler'] = False
config['migrate_groups'] = False
config['migrate_citytags'] = None
# ----------------------------------------------------------------
# - datelock if set
try:
config['datelock'] = CNF.get('migrate','datelock')
except:
config['datelock'] = None
# - Whether the system is allowed to create users or not
try:
config['allow_create_users'] = CNF.getboolean('migrate','allow_create_users')
except:
config['allow_create_users'] = False
# ----------------------------------------------------------------
# - Loading operational and test judgingclass
try:
config['judging_operational'] = CNF.get('judging','operational')
config['judging_test'] = CNF.get('judging','test')
except:
utils.exit('Problems reading necessary judging config!')
# ----------------------------------------------------------------
# - Some configs where the data are.
# data_moses: where Klaus Knuepffer stores the moses equations
try:
config['data_moses'] = CNF.get('data','moses')
except:
utils.exit('Problems rading all required data infos from config file')
if not os.path.isdir( config['data_moses'] ):
print "[WARNING] Could not find directory %s necessary for ComputeMoses" % config['data_moses']
print " ComputeMoes will crash!"
try:
config['data_moses_out'] = CNF.get('data','moses_out')
# If folder does not exist: ignore
if not os.path.isdir( config['data_moses_out'] ):
print "[WARNING] Output directory for moses (moses_out=\"{0:s}\")".format(config['data_moses_out'])
print " does not exist, ignore!"
config['data_moses_out'] = None
except:
utils.exit('No [data] modes_out directory set, will not copy files to webserver.')
config['data_moses_out'] = None
# ----------------------------------------------------------------
# - The rawdir is used by archive.py to import old
# wetterturnier data. Should never be used in the final version.
try:
config['rawdir'] = CNF.get('system','rawdir')
except:
utils.exit('Problems rading all required system infos from config file')
# ----------------------------------------------------------------
# - Reading all stations
tmp = CNF.items('stations')
stn = {}
for elem in tmp:
stn[elem[0]] = int(elem[1])
config['stations'] = stn
# ----------------------------------------------------------------
# - Adding inputs if set
if not inputs == None:
for k in inputs.keys():
# - Duplicated?
if k in config.keys():
utils.exit("inputs dict contains keys which are generated in this " + \
"method as well. Duplication! Exit. Key is: %s" % k)
# - Else append
if inputs[k] == 'None':
config[k] = None
else:
config[k] = inputs[k]
return config
# -------------------------------------------------------------------
# - Reading wmo ww conversion file.
# -------------------------------------------------------------------
[docs]class wmowwConversion( object ):
"""Helper class to convert observed weather types into the classes
0/4/5/6/7/8/9 as used by Wetterturnier.de.
Reads the wmo ww config file using :obj:`ConfigParser.ConfigParser`.
The file contains the conversion rules from received observations to
one of the main weather types (0,4,5,6,7,8,9). Can be used for present
weather (ww) and past weather (w1/w2).
Args:
file (:obj:`str`): String, file name containing the specifications.
"""
_past_weather_ = None
_present_weather_ = None
def __init__( self, file ):
import sys, os
if not os.path.isfile( file ):
print "Sorry, cannot find file \"{0:s}\"".format(file)
sys.exit(8)
# Save file
self.file = file
# Use configparser to read the file
from ConfigParser import ConfigParser
CNF = ConfigParser()
try:
CNF.read( file )
except Exception as e:
print e
print "ERROR reading the file \"{0:s}\"".format(file)
sys.exit(9)
self._past_weather_ = self._read_section_(CNF,"past weather")
self._present_weather_ = self._read_section_(CNF,"present weather")
def _read_section_( self, CNF, section ):
"""Reading specific section of the config file and parses the settins.
If the section does not exist: return None. Else a list is returned
containing a dict for each valid setting entry in the config file.
The dict keys are "integers as string", the weather type code as
reported in the BUFR file. The dict for each key contains a "name"
and a "gets" argument. "name" is simply the description of the setting,
"gets" is either None (if defined to convert to None) or an integer
(the new code flag in which the observed one should be converted).
Args:
CNf (:obj:`ConfigParser.ConfigParser`): The config file handler.
section (:obj:`str`): String, which section to be read.
Returns:
list/None: None if section does not exist. Else a list with dicts.
"""
if not CNF.has_section(section): return None
import sys, re
res = {}
for rec in CNF.items(section):
name = rec[0].strip()
mtch = re.match("^([0-9]+)\s+gets\s+([0-9]+|None)$",rec[1])
if not mtch:
print " [!] Misspecified: \"{0:s}\". Ignore.".format(rec)
continue
# Use integer key as string key
if mtch.group(2) == "None": val = None
else: val = int(mtch.group(2))
# Append Result
res[mtch.group(1)] = {"name":rec[0].strip(), "gets":val}
return res
[docs] def convert( self, section, code ):
"""Convert an observed weather code into the new one.
"section" defines the conversion table. If set to "past" (or "w1" or "w2"; not
case sensitive) the [past weather] conversion table will be used. If set to
"present" (or "ww") the [present weather] will be used.
Returns the value in which the observed "code" is converted. If no specificiation
is available a "None" is returned.
Args:
section (:obj:`str`): One of past/w1/w2 or present/ww. Not case sensitive.
code (:obj:`int`): Integer with the observed code. If None is given, None
will be returned. Same yields if "code" is no integer (None returned).
Returns:
int/None: Returns an integer or None, depending on the settings.
"""
if code is None: return None
if not type(code) == int: return None
strcode = "{:d}".format(code)
if section.lower() in ["past","w1","w2"]:
lookup = self._past_weather_
elif section.lower() in ["present","ww"]:
lookup = self._present_weather_
else:
import sys
sys.exit("Problems with wmowwConversion.convert input argument \"section\". " + \
"Input section=\"{:s}\" is unknown.".format(section))
# If key is not defined: return None
if not strcode in lookup.keys(): return None
# Else return the corresponding convert-to value
return lookup[strcode]["gets"]
[docs] def show( self ):
"""Development method: prints content of the object to stdout.
"""
print "\nwmoww Config Section \"past weather\":"
if not self._past_weather_:
print " - Nothing defined"
else:
for key,values in self._past_weather_.iteritems():
if values["gets"]:
print " - {0:<50s}: {1:3d} gets {2:4d}".format(values["name"],int(key),values["gets"])
else:
print " - {0:<50s}: {1:3d} gets None".format(values["name"],int(key))
print "\nwmoww Config Section \"present weather\":"
if not self._present_weather_:
print " - Nothing defined"
else:
for key,values in self._present_weather_.iteritems():
if values["gets"]:
print " - {0:<50s}: {1:3d} gets {2:4d}".format(values["name"],int(key),values["gets"])
else:
print " - {0:<50s}: {1:3d} gets None".format(values["name"],int(key))
# Helper function which creates a timestamp from a datetime object
def timestamp( dt ):
"Return POSIX timestamp from datetime object as float"
import time as t
return t.mktime( dt.timetuple() )
def datetime2tdate( datetime ):
"Convert datetime object to tdate"
return np.floor( timestamp( datetime ) / 86400 )
def tdate2datetime( tdate ):
"Convert tdate to datetime object"
from datetime import datetime as dt
return dt.fromtimestamp( tdate * 86400 )
# -------------------------------------------------------------------
# - Convert date since 1970-01-01 into a readable string
# -------------------------------------------------------------------
def tdate2string( tdate, short=False ):
""" Converts tdate into string of form YYYY-MM-DD.
Note: a so called tdate is nothing else than an integer value
indicating the days since 1970-01-01 which is used extensively
in the wetterturnier (especially to optimize the databases).
Args:
date (:obj:`int`): Days since 1970-01-01
Returns:
string: Formatted string, format ``%Y-%m-%d``.
"""
if short: fmt = "%y%m%d"
else: fmt = "%Y-%m-%d"
return tdate2datetime( tdate ).strftime( fmt )
def string2tdate( datestring, moses = False ):
"opposite of the above function"
from datetime import datetime as dt
if moses: #mosesYYMMDD
year = int(datestring[0:2])
mon = int(datestring[2:4])
day = int(datestring[4:6])
else:
year = int(datestring[0:4])
mon = int(datestring[5:7])
day = int(datestring[8:10])
dtobj = dt(year, mon, day)
return int( timestamp2tdate( timestamp( dtobj ) ) )
def timestamp2tdate( timestamp ):
return int( timestamp / 86400 )
def tdate2timestamp( tdate ):
return tdate * 86400
def today_tdate():
"Returns the current date as tdate"
from datetime import datetime as dt
return timestamp2tdate( timestamp( dt.utcnow() ) )
#today = int( dt.datetime.now().strftime('%s') / 86400 )
# -------------------------------------------------------------------
# - Manipulate special characters to get propper names
# -------------------------------------------------------------------
def nicename( string, conversion_table = None ):
"""Creates a nice username.
Mainly used for migration where (from the text files of the old
wetterturnier archive) a few usernames were including special characters,
blanks, or other special things. The new Wetterturnier only allows
propper non-special-character usernames. This function converts
possibly impropper usernames to its propper equivalent.
Args:
string (:obj:`str`): Possibly impropper user name.
conversion_table (:obj:`dict`): Dict used for string translation.
Default None (unused).
Returns:
string: String with nice username without special characters and shit.
"""
import unicodedata
import re
import utils
# Check whether conversion table is set. If: check if
# User is in the conversion table keys. If so, rename
string = string.strip()
if conversion_table is not None:
if string in conversion_table.keys():
string = conversion_table[string]
# Escape dangerous characters
string = re.escape(string).strip()
nicename = unicodedata.normalize('NFKD', unicode(string,'ISO-8859-1')) \
.encode('ascii', 'ignore')
if not "Titisee" in nicename and not "Neustadt" in nicename:
nicename = nicename.replace('/','_')
nicename = nicename.replace('\/','')
nicename = nicename.replace('\_','_')
nicename = nicename.replace('\\A','Ae')
nicename = nicename.replace('\\O','Oe')
nicename = nicename.replace('\\U','Ue')
nicename = nicename.replace('\\a','ae')
nicename = nicename.replace('\\o','oe')
nicename = nicename.replace('\\u','ue')
nicename = nicename.replace('\\','')
nicename = nicename.replace('\)','')
nicename = nicename.replace('\(','')
nicename = nicename.replace(')','')
nicename = nicename.replace('(','')
nicename = nicename.replace('\+','')
nicename = nicename.replace('+','')
nicename = nicename.replace('\-','-')
nicename = nicename.replace('\.','')
nicename = nicename.replace(' ','_')
nicename = nicename.replace('\e','e')
nicename = nicename.replace('?','')
# - This is only for the Grossmeister
nicename = nicename.replace('\m','ss')
if "\\" in nicename:
utils.exit('nicename is with special ' + nicename)
return nicename
# -------------------------------------------------------------------
# - Customized exit handling
# -------------------------------------------------------------------
def exit(msg="unknown error"):
"""Simple exit wrapper.
Can be used to create kind of user-defined exit handling.
Args:
msg (:obj:`str`): String, the error message which should be dropped (stderr).
"""
import sys
sys.exit('[!] ERROR: %s' % msg)