import re import csv import sys import getpass import xmlrpclib import htmlentitydefs import xml.dom.minidom from itertools import groupby from urllib import quote_plus, urlencode from urllib2 import urlopen, HTTPError from xml.parsers.expat import ExpatError try: set except NameError: import sets.Set as set # Other submodules from geopy: import util # Now try some more exotic modules... try: from BeautifulSoup import BeautifulSoup except ImportError: print "BeautifulSoup was not found. " \ "Geocoders assuming malformed markup will not work." try: import simplejson except ImportError: try: from django.utils import simplejson except ImportError: print "simplejson was not found. " \ "Geocoders relying on JSON parsing will not work." class Geocoder(object): """Base class for all geocoders.""" def geocode(self, string): raise NotImplementedError class WebGeocoder(Geocoder): """A Geocoder subclass with utility methods helpful for handling results given by web-based geocoders.""" @classmethod def _get_encoding(cls, page, contents=None): """Get the last encoding (charset) listed in the header of ``page``.""" plist = page.headers.getplist() if plist: key, value = plist[-1].split('=') if key.lower() == 'charset': return value if contents: try: return xml.dom.minidom.parseString(contents).encoding except ExpatError: pass @classmethod def _decode_page(cls, page): """Read the encoding (charset) of ``page`` and try to encode it using UTF-8.""" contents = page.read() encoding = cls._get_encoding(page, contents) or sys.getdefaultencoding() return unicode(contents, encoding=encoding).encode('utf-8') @classmethod def _get_first_text(cls, node, tag_names, strip=None): """Get the text value of the first child of ``node`` with tag ``tag_name``. The text is stripped using the value of ``strip``.""" if isinstance(tag_names, basestring): tag_names = [tag_names] if node: while tag_names: nodes = node.getElementsByTagName(tag_names.pop(0)) if nodes: child = nodes[0].firstChild return child and child.nodeValue.strip(strip) @classmethod def _join_filter(cls, sep, seq, pred=bool): """Join items in ``seq`` with string ``sep`` if pred(item) is True. Sequence items are passed to unicode() before joining.""" return sep.join([unicode(i) for i in seq if pred(i)]) class MediaWiki(WebGeocoder): def __init__(self, format_url, transform_string=None): """Initialize a geocoder that can parse MediaWiki pages with the GIS extension enabled. ``format_url`` is a URL string containing '%s' where the page name to request will be interpolated. For example: 'http://www.wiki.com/wiki/%s' ``transform_string`` is a callable that will make appropriate replacements to the input string before requesting the page. If None is given, the default transform_string which replaces ' ' with '_' will be used. It is recommended that you consider this argument keyword-only, since subclasses will likely place it last. """ self.format_url = format_url if callable(transform_string): self.transform_string = transform_string @classmethod def transform_string(cls, string): """Do the WikiMedia dance: replace spaces with underscores.""" return string.replace(' ', '_') def geocode(self, string): wiki_string = self.transform_string(string) url = self.format_url % wiki_string return self.geocode_url(url) def geocode_url(self, url): print "Fetching %s..." % url page = urlopen(url) name, (latitude, longitude) = self.parse_xhtml(page) return (name, (latitude, longitude)) def parse_xhtml(self, page): soup = isinstance(page, BeautifulSoup) and page or BeautifulSoup(page) meta = soup.head.find('meta', {'name': 'geo.placename'}) name = meta and meta['content'] or None meta = soup.head.find('meta', {'name': 'geo.position'}) if meta: position = meta['content'] latitude, longitude = util.parse_geo(position) if latitude == 0 or longitude == 0: latitude = longitude = None else: latitude = longitude = None return (name, (latitude, longitude)) class SemanticMediaWiki(MediaWiki): def __init__(self, format_url, attributes=None, relations=None, prefer_semantic=False, transform_string=None): """Initialize a geocoder that can parse MediaWiki pages with the GIS extension enabled, and can follow Semantic MediaWiki relations until a geocoded page is found. ``attributes`` is a sequence of semantic attribute names that can contain geographical coordinates. They will be tried, in order, if the page is not geocoded with the GIS extension. A single attribute may be passed as a string. For example: attributes=['geographical coordinate'] or: attributes='geographical coordinate' ``relations`` is a sequence of semantic relation names that will be followed, depth-first in order, until a geocoded page is found. A single relation name may be passed as a string. For example: relations=['Located in'] or: relations='Located in' ``prefer_semantic`` indicates whether or not the contents of the semantic attributes (given by ``attributes``) should be preferred over the GIS extension's coordinates if both exist. This defaults to False, since making it True will cause every page's RDF to be requested when it often won't be necessary. """ base = super(SemanticMediaWiki, self) base.__init__(format_url, transform_string) if attributes is None: self.attributes = [] elif isinstance(attributes, basestring): self.attributes = [attributes] else: self.attributes = attributes if relations is None: self.relations = [] elif isinstance(relations, basestring): self.relations = [relations] else: self.relations = relations self.prefer_semantic = prefer_semantic def transform_semantic(self, string): """Normalize semantic attribute and relation names by replacing spaces with underscores and capitalizing the result.""" return string.replace(' ', '_').capitalize() def geocode_url(self, url, tried=None): if tried is None: tried = set() print "Fetching %s..." % url page = urlopen(url) soup = BeautifulSoup(page) name, (latitude, longitude) = self.parse_xhtml(soup) if None in (name, latitude, longitude) or self.prefer_semantic: rdf_url = self.parse_rdf_link(soup) print "Fetching %s..." % rdf_url page = urlopen(rdf_url) things, thing = self.parse_rdf(page) name = self.get_label(thing) attributes = self.get_attributes(thing) for attribute, value in attributes: latitude, longitude = util.parse_geo(value) if None not in (latitude, longitude): break if None in (latitude, longitude): relations = self.get_relations(thing) for relation, resource in relations: url = things.get(resource, resource) if url in tried: # Avoid cyclic relationships. continue tried.add(url) name, (latitude, longitude) = self.geocode_url(url, tried) if None not in (name, latitude, longitude): break return (name, (latitude, longitude)) def parse_rdf_link(self, page, mime_type='application/rdf+xml'): """Parse the URL of the RDF link from the
of ``page``.""" soup = isinstance(page, BeautifulSoup) and page or BeautifulSoup(page) link = soup.head.find('link', rel='alternate', type=mime_type) return link and link['href'] or None def parse_rdf(self, page): if not isinstance(page, basestring): page = self._decode_page(page) doc = xml.dom.minidom.parseString(page) things = {} for thing in reversed(doc.getElementsByTagName('smw:Thing')): name = thing.attributes['rdf:about'].value articles = thing.getElementsByTagName('smw:hasArticle') things[name] = articles[0].attributes['rdf:resource'].value # ``thing`` should now be the semantic data for the exported page. return (things, thing) def get_label(self, thing): return self._get_first_text(thing, 'rdfs:label') def get_attributes(self, thing, attributes=None): if attributes is None: attributes = self.attributes for attribute in attributes: attribute = self.transform_semantic(attribute) for node in thing.getElementsByTagName('attribute:' + attribute): value = node.firstChild.nodeValue.strip() yield (attribute, value) def get_relations(self, thing, relations=None): if relations is None: relations = self.relations for relation in relations: relation = self.transform_semantic(relation) for node in thing.getElementsByTagName('relation:' + relation): resource = node.attributes['rdf:resource'].value yield (relation, resource) class Google(WebGeocoder): """Geocoder using the Google Maps API.""" def __init__(self, api_key=None, domain='maps.google.com', resource='maps/geo', format_string='%s', output_format='kml'): """Initialize a customized Google geocoder with location-specific address information and your Google Maps API key. ``api_key`` should be a valid Google Maps API key. It is required for the 'maps/geo' resource to work. ``domain`` should be a the Google Maps domain to connect to. The default is 'maps.google.com', but if you're geocoding address in the UK (for example), you may want to set it to 'maps.google.co.uk'. ``resource`` is the HTTP resource to give the query parameter. 'maps/geo' is the HTTP geocoder and is a documented API resource. 'maps' is the actual Google Maps interface and its use for just geocoding is undocumented. Anything else probably won't work. ``format_string`` is a string containing '%s' where the string to geocode should be interpolated before querying the geocoder. For example: '%s, Mountain View, CA'. The default is just '%s'. ``output_format`` can be 'json', 'xml', 'kml', 'csv', or 'js' and will control the output format of Google's response. The default is 'kml' since it is supported by both the 'maps' and 'maps/geo' resources. The 'js' format is the most likely to break since it parses Google's JavaScript, which could change. However, it currently returns the best results for restricted geocoder areas such as the UK. """ self.api_key = api_key self.domain = domain self.resource = resource self.format_string = format_string self.output_format = output_format @property def url(self): domain = self.domain.strip('/') resource = self.resource.strip('/') return "http://%(domain)s/%(resource)s?%%s" % locals() def geocode(self, string, exactly_one=True): params = {'q': self.format_string % string, 'output': self.output_format.lower(), } if self.resource.rstrip('/').endswith('geo'): # An API key is only required for the HTTP geocoder. params['key'] = self.api_key url = self.url % urlencode(params) return self.geocode_url(url, exactly_one) def geocode_url(self, url, exactly_one=True): print "Fetching %s..." % url page = urlopen(url) dispatch = getattr(self, 'parse_' + self.output_format) return dispatch(page, exactly_one) def parse_xml(self, page, exactly_one=True): """Parse a location name, latitude, and longitude from an XML response. """ if not isinstance(page, basestring): page = self._decode_page(page) try: doc = xml.dom.minidom.parseString(page) except ExpatError: places = [] else: places = doc.getElementsByTagName('Placemark') if exactly_one and len(places) != 1: raise ValueError("Didn't find exactly one placemark! " \ "(Found %d.)" % len(places)) def parse_place(place): location = self._get_first_text(place, ['address', 'name']) or None points = place.getElementsByTagName('Point') point = points and points[0] or None coords = self._get_first_text(point, 'coordinates') or None if coords: longitude, latitude = [float(f) for f in coords.split(',')[:2]] else: latitude = longitude = None _, (latitude, longitude) = self.geocode(location) return (location, (latitude, longitude)) if exactly_one: return parse_place(places[0]) else: return (parse_place(place) for place in places) def parse_csv(self, page, exactly_one=True): raise NotImplementedError def parse_kml(self, page, exactly_one=True): return self.parse_xml(page, exactly_one) def parse_json(self, page, exactly_one=True): if not isinstance(page, basestring): page = self._decode_page(page) json = simplejson.loads(page) places = json.get('Placemark', []) if exactly_one and len(places) != 1: raise ValueError("Didn't find exactly one placemark! " \ "(Found %d.)" % len(places)) def parse_place(place): location = place.get('address') longitude, latitude = place['Point']['coordinates'][:2] return (location, (latitude, longitude)) if exactly_one: return parse_place(places[0]) else: return (parse_place(place) for place in places) def parse_js(self, page, exactly_one=True): """This parses JavaScript returned by queries the actual Google Maps interface and could thus break easily. However, this is desirable if the HTTP geocoder doesn't work for addresses in your country (the UK, for example). """ if not isinstance(page, basestring): page = self._decode_page(page) LATITUDE = r"[\s,]lat:\s*(?P