import urllib import smtplib import httplib import sys import os import mimetypes import re import time import random import urlparse import hmac import binascii import locale import datetime re_path_template = re.compile('{\w+}') HTTP_METHOD = 'GET' VERSION = '1.0' class TweepError(Exception): """Tweepy exception""" def __init__(self, reason, response=None): self.reason = str(reason) self.response = response def __str__(self): return self.reason def parse_a_href(atag): start = atag.find('"') + 1 end = atag.find('"', start) return atag[start:end] def parse_search_datetime(string): # Set locale for date parsing locale.setlocale(locale.LC_TIME, 'C') # We must parse datetime this way to work in python 2.4 date = datetime(*(time.strptime(string, '%a, %d %b %Y %H:%M:%S +0000')[0:6])) # Reset locale back to the default setting locale.setlocale(locale.LC_TIME, '') return date def unescape_html(text): """Created by Fredrik Lundh (http://effbot.org/zone/re-sub.htm#unescape-html)""" def fixup(m): text = m.group(0) if text[:2] == "&#": # character reference try: if text[:3] == "&#x": return unichr(int(text[3:-1], 16)) else: return unichr(int(text[2:-1])) except ValueError: pass else: # named entity try: text = unichr(htmlentitydefs.name2codepoint[text[1:-1]]) except KeyError: pass return text # leave as is return re.sub("&#?\w+;", fixup, text) def parse_html_value(html): return html[html.find('>')+1:html.rfind('<')] def parse_datetime(string): # Set locale for date parsing locale.setlocale(locale.LC_TIME, 'C') # We must parse datetime this way to work in python 2.4 date = datetime(*(time.strptime(string, '%a %b %d %H:%M:%S +0000 %Y')[0:6])) # Reset locale back to the default setting locale.setlocale(locale.LC_TIME, '') return date def build_authenticate_header(realm=''): """Optional WWW-Authenticate header (401 error)""" return {'WWW-Authenticate': 'OAuth realm="%s"' % realm} def escape(s): """Escape a URL including any /.""" return urllib.quote(s, safe='~') def _utf8_str(s): """Convert unicode to utf-8.""" if isinstance(s, unicode): return s.encode("utf-8") else: return str(s) def generate_timestamp(): """Get seconds since epoch (UTC).""" return int(time.time()) def generate_nonce(length=8): """Generate pseudorandom number.""" return ''.join([str(random.randint(0, 9)) for i in range(length)]) def generate_verifier(length=8): """Generate pseudorandom number.""" return ''.join([str(random.randint(0, 9)) for i in range(length)]) def convert_to_utf8_str(arg): # written by Michael Norton (http://docondev.blogspot.com/) if isinstance(arg, unicode): arg = arg.encode('utf-8') elif not isinstance(arg, str): arg = str(arg) return arg class Model(object): def __init__(self, api=None): self._api = api def __getstate__(self): # pickle pickle = dict(self.__dict__) try: del pickle['_api'] # do not pickle the API reference except KeyError: pass return pickle @classmethod def parse(cls, api, json): """Parse a JSON object into a model instance.""" raise NotImplementedError @classmethod def parse_list(cls, api, json_list): """Parse a list of JSON objects into a result set of model instances.""" results = ResultSet() for obj in json_list: results.append(cls.parse(api, obj)) return results class User(Model): @classmethod def parse(cls, api, json): user = cls(api) for k, v in json.items(): #if k == 'created_at': #setattr(user, k, parse_datetime(v)) if k == 'status': setattr(user, k, Status.parse(api, v)) elif k == 'following': # twitter sets this to null if it is false if v is True: setattr(user, k, True) else: setattr(user, k, False) else: setattr(user, k, v) return user @classmethod def parse_list(cls, api, json_list): if isinstance(json_list, list): item_list = json_list else: item_list = json_list['users'] results = ResultSet() for obj in item_list: results.append(cls.parse(api, obj)) return results def timeline(self, **kargs): return self._api.user_timeline(user_id=self.id, **kargs) def friends(self, **kargs): return self._api.friends(user_id=self.id, **kargs) def followers(self, **kargs): return self._api.followers(user_id=self.id, **kargs) def follow(self): self._api.create_friendship(user_id=self.id) self.following = True def unfollow(self): self._api.destroy_friendship(user_id=self.id) self.following = False def lists_memberships(self, *args, **kargs): return self._api.lists_memberships(user=self.screen_name, *args, **kargs) def lists_subscriptions(self, *args, **kargs): return self._api.lists_subscriptions(user=self.screen_name, *args, **kargs) def lists(self, *args, **kargs): return self._api.lists(user=self.screen_name, *args, **kargs) def followers_ids(self, *args, **kargs): return self._api.followers_ids(user_id=self.id, *args, **kargs) class DirectMessage(Model): @classmethod def parse(cls, api, json): dm = cls(api) for k, v in json.items(): if k == 'sender' or k == 'recipient': setattr(dm, k, User.parse(api, v)) #elif k == 'created_at': #setattr(dm, k, parse_datetime(v)) else: setattr(dm, k, v) return dm def destroy(self): return self._api.destroy_direct_message(self.id) class Friendship(Model): @classmethod def parse(cls, api, json): relationship = json['relationship'] # parse source source = cls(api) for k, v in relationship['source'].items(): setattr(source, k, v) # parse target target = cls(api) for k, v in relationship['target'].items(): setattr(target, k, v) return source, target class SavedSearch(Model): @classmethod def parse(cls, api, json): ss = cls(api) for k, v in json.items(): #if k == 'created_at': #setattr(ss, k, parse_datetime(v)) #else: setattr(ss, k, v) return ss def destroy(self): return self._api.destroy_saved_search(self.id) class SearchResult(Model): @classmethod def parse(cls, api, json): result = cls() for k, v in json.items(): if k == 'created_at': setattr(result, k, parse_search_datetime(v)) elif k == 'source': setattr(result, k, parse_html_value(unescape_html(v))) else: setattr(result, k, v) return result @classmethod def parse_list(cls, api, json_list, result_set=None): results = ResultSet() results.max_id = json_list.get('max_id') results.since_id = json_list.get('since_id') results.refresh_url = json_list.get('refresh_url') results.next_page = json_list.get('next_page') results.results_per_page = json_list.get('results_per_page') results.page = json_list.get('page') results.completed_in = json_list.get('completed_in') results.query = json_list.get('query') for obj in json_list['results']: results.append(cls.parse(api, obj)) return results class List(Model): @classmethod def parse(cls, api, json): lst = List(api) for k,v in json.items(): if k == 'user': setattr(lst, k, User.parse(api, v)) else: setattr(lst, k, v) return lst @classmethod def parse_list(cls, api, json_list, result_set=None): results = ResultSet() for obj in json_list['lists']: results.append(cls.parse(api, obj)) return results def update(self, **kargs): return self._api.update_list(self.slug, **kargs) def destroy(self): return self._api.destroy_list(self.slug) def timeline(self, **kargs): return self._api.list_timeline(self.user.screen_name, self.slug, **kargs) def add_member(self, id): return self._api.add_list_member(self.slug, id) def remove_member(self, id): return self._api.remove_list_member(self.slug, id) def members(self, **kargs): return self._api.list_members(self.user.screen_name, self.slug, **kargs) def is_member(self, id): return self._api.is_list_member(self.user.screen_name, self.slug, id) def subscribe(self): return self._api.subscribe_list(self.user.screen_name, self.slug) def unsubscribe(self): return self._api.unsubscribe_list(self.user.screen_name, self.slug) def subscribers(self, **kargs): return self._api.list_subscribers(self.user.screen_name, self.slug, **kargs) def is_subscribed(self, id): return self._api.is_subscribed_list(self.user.screen_name, self.slug, id) class JSONModel(Model): @classmethod def parse(cls, api, json): return json class IDModel(Model): @classmethod def parse(cls, api, json): if isinstance(json, list): return json else: return json['ids'] class Status(Model): @classmethod def parse(cls, api, json): status = cls(api) for k, v in json.items(): if k == 'user': user = User.parse(api, v) setattr(status, 'author', user) setattr(status, 'user', user) # DEPRECIATED #elif k == 'created_at': #setattr(status, k, parse_datetime(v)) elif k == 'source': if '<' in v: setattr(status, k, parse_html_value(v)) setattr(status, 'source_url', parse_a_href(v)) else: setattr(status, k, v) elif k == 'retweeted_status': setattr(status, k, Status.parse(api, v)) else: setattr(status, k, v) return status def destroy(self): return self._api.destroy_status(self.id) def retweet(self): return self._api.retweet(self.id) def retweets(self): return self._api.retweets(self.id) def favorite(self): return self._api.create_favorite(self.id) class ModelFactory(object): """ Used by parsers for creating instances of models. You may subclass this factory to add your own extended models. """ status = Status user = User direct_message = DirectMessage friendship = Friendship saved_search = SavedSearch search_result = SearchResult list = List json = JSONModel ids = IDModel def import_simplejson(): try: import simplejson as json except ImportError: print "one" try: import json # Python 2.6+ except ImportError: print "two" try: from django.utils import simplejson as json # Google App Engine except ImportError: print "three" raise ImportError, "Can't load a json library" return json class Parser(object): def parse(self, method, payload): """ Parse the response payload and return the result. Returns a tuple that contains the result data and the cursors (or None if not present). """ raise NotImplementedError def parse_error(self, payload): """ Parse the error message from payload. If unable to parse the message, throw an exception and default error message will be used. """ raise NotImplementedError class JSONParser(Parser): payload_format = 'json' def __init__(self): self.json_lib = import_simplejson() def parse(self, method, payload): try: json = self.json_lib.loads(payload) except Exception, e: raise TweepError('Failed to parse JSON payload: %s' % e) if isinstance(json, dict) and 'previous_cursor' in json and 'next_cursor' in json: cursors = json['previous_cursor'], json['next_cursor'] return json, cursors else: return json def parse_error(self, payload): error = self.json_lib.loads(payload) if error.has_key('error'): return error['error'] else: return error['errors'] class ModelParser(JSONParser): def __init__(self, model_factory=None): JSONParser.__init__(self) self.model_factory = model_factory or ModelFactory def parse(self, method, payload): try: if method.payload_type is None: return model = getattr(self.model_factory, method.payload_type) except AttributeError: raise TweepError('No model for this payload type: %s' % method.payload_type) json = JSONParser.parse(self, method, payload) if isinstance(json, tuple): json, cursors = json else: cursors = None if method.payload_list: result = model.parse_list(method.api, json) else: result = model.parse(method.api, json) if cursors: return result, cursors else: return result def bind_api(**config): class APIMethod(object): path = config['path'] payload_type = config.get('payload_type', None) payload_list = config.get('payload_list', False) allowed_param = config.get('allowed_param', []) method = config.get('method', 'GET') require_auth = config.get('require_auth', False) search_api = config.get('search_api', False) def __init__(self, api, args, kargs): # If authentication is required and no credentials # are provided, throw an error. if self.require_auth and not api.auth: raise TweepError('Authentication required!') self.api = api self.post_data = kargs.pop('post_data', None) self.retry_count = kargs.pop('retry_count', api.retry_count) self.retry_delay = kargs.pop('retry_delay', api.retry_delay) self.retry_errors = kargs.pop('retry_errors', api.retry_errors) self.headers = kargs.pop('headers', {}) self.build_parameters(args, kargs) # Pick correct URL root to use if self.search_api: self.api_root = api.search_root else: self.api_root = api.api_root # Perform any path variable substitution self.build_path() if api.secure: self.scheme = 'https://' else: self.scheme = 'http://' if self.search_api: self.host = api.search_host else: self.host = api.host # Manually set Host header to fix an issue in python 2.5 # or older where Host is set including the 443 port. # This causes Twitter to issue 301 redirect. # See Issue http://github.com/joshthecoder/tweepy/issues/#issue/12 self.headers['Host'] = self.host def build_parameters(self, args, kargs): self.parameters = {} for idx, arg in enumerate(args): try: self.parameters[self.allowed_param[idx]] = convert_to_utf8_str(arg) except IndexError: raise TweepError('Too many parameters supplied!') for k, arg in kargs.items(): if arg is None: continue if k in self.parameters: raise TweepError('Multiple values for parameter %s supplied!' % k) self.parameters[k] = convert_to_utf8_str(arg) def build_path(self): for variable in re_path_template.findall(self.path): name = variable.strip('{}') if name == 'user' and 'user' not in self.parameters and self.api.auth: # No 'user' parameter provided, fetch it from Auth instead. value = self.api.auth.get_username() else: try: value = urllib.quote(self.parameters[name]) except KeyError: raise TweepError('No parameter value found for path variable: %s' % name) del self.parameters[name] self.path = self.path.replace(variable, value) def execute(self): # Build the request URL url = self.api_root + self.path if len(self.parameters): url = '%s?%s' % (url, urllib.urlencode(self.parameters)) # Query the cache if one is available # and this request uses a GET method. if self.api.cache and self.method == 'GET': cache_result = self.api.cache.get(url) # if cache result found and not expired, return it if cache_result: # must restore api reference if isinstance(cache_result, list): for result in cache_result: result._api = self.api else: cache_result._api = self.api return cache_result # Continue attempting request until successful # or maximum number of retries is reached. retries_performed = 0 while retries_performed < self.retry_count + 1: # Open connection # FIXME: add timeout if self.api.secure: conn = httplib.HTTPSConnection(self.host) else: conn = httplib.HTTPConnection(self.host) # Apply authentication if self.api.auth: self.api.auth.apply_auth( self.scheme + self.host + url, self.method, self.headers, self.parameters ) # Execute request try: conn.request(self.method, url, headers=self.headers, body=self.post_data) resp = conn.getresponse() except Exception, e: raise TweepError('Failed to send request: %s' % e) # Exit request loop if non-retry error code if self.retry_errors: if resp.status not in self.retry_errors: break else: if resp.status == 200: break # Sleep before retrying request again time.sleep(self.retry_delay) retries_performed += 1 # If an error was returned, throw an exception self.api.last_response = resp if resp.status != 200: try: error_msg = self.api.parser.parse_error(resp.read()) except Exception: error_msg = "Twitter error response: status code = %s" % resp.status raise TweepError(error_msg, resp) # Parse the response payload result = self.api.parser.parse(self, resp.read()) conn.close() # Store result into cache if one is available. if self.api.cache and self.method == 'GET' and result: self.api.cache.store(url, result) return result def _call(api, *args, **kargs): method = APIMethod(api, args, kargs) return method.execute() # Set pagination mode if 'cursor' in APIMethod.allowed_param: _call.pagination_mode = 'cursor' elif 'page' in APIMethod.allowed_param: _call.pagination_mode = 'page' return _call class API(object): """Twitter API""" def __init__(self, auth_handler=None, host='api.twitter.com', search_host='search.twitter.com', cache=None, secure=False, api_root='/1', search_root='', retry_count=0, retry_delay=0, retry_errors=None, parser=None): self.auth = auth_handler self.host = host self.search_host = search_host self.api_root = api_root self.search_root = search_root self.cache = cache self.secure = secure self.retry_count = retry_count self.retry_delay = retry_delay self.retry_errors = retry_errors self.parser = parser or ModelParser() """ statuses/public_timeline """ public_timeline = bind_api( path = '/statuses/public_timeline.json', payload_type = 'status', payload_list = True, allowed_param = [] ) """ statuses/home_timeline """ home_timeline = bind_api( path = '/statuses/home_timeline.json', payload_type = 'status', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ statuses/friends_timeline """ friends_timeline = bind_api( path = '/statuses/friends_timeline.json', payload_type = 'status', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ statuses/user_timeline """ user_timeline = bind_api( path = '/statuses/user_timeline.json', payload_type = 'status', payload_list = True, allowed_param = ['id', 'user_id', 'screen_name', 'since_id', 'max_id', 'count', 'page'] ) """ statuses/mentions """ mentions = bind_api( path = '/statuses/mentions.json', payload_type = 'status', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """/statuses/:id/retweeted_by.format""" retweeted_by = bind_api( path = '/statuses/{id}/retweeted_by.json', payload_type = 'status', payload_list = True, allowed_param = ['id', 'count', 'page'], require_auth = True ) """/statuses/:id/retweeted_by/ids.format""" retweeted_by_ids = bind_api( path = '/statuses/{id}/retweeted_by/ids.json', payload_type = 'ids', allowed_param = ['id', 'count', 'page'], require_auth = True ) """ statuses/retweeted_by_me """ retweeted_by_me = bind_api( path = '/statuses/retweeted_by_me.json', payload_type = 'status', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ statuses/retweeted_to_me """ retweeted_to_me = bind_api( path = '/statuses/retweeted_to_me.json', payload_type = 'status', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ statuses/retweets_of_me """ retweets_of_me = bind_api( path = '/statuses/retweets_of_me.json', payload_type = 'status', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ statuses/show """ get_status = bind_api( path = '/statuses/show.json', payload_type = 'status', allowed_param = ['id'] ) """ statuses/update """ update_status = bind_api( path = '/statuses/update.json', method = 'POST', payload_type = 'status', allowed_param = ['status', 'in_reply_to_status_id', 'lat', 'long', 'source', 'place_id'], require_auth = True ) """ statuses/destroy """ destroy_status = bind_api( path = '/statuses/destroy.json', method = 'DELETE', payload_type = 'status', allowed_param = ['id'], require_auth = True ) """ statuses/retweet """ retweet = bind_api( path = '/statuses/retweet/{id}.json', method = 'POST', payload_type = 'status', allowed_param = ['id'], require_auth = True ) """ statuses/retweets """ retweets = bind_api( path = '/statuses/retweets/{id}.json', payload_type = 'status', payload_list = True, allowed_param = ['id', 'count'], require_auth = True ) """ users/show """ get_user = bind_api( path = '/users/show.json', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'] ) """ Perform bulk look up of users from user ID or screenname """ def lookup_users(self, user_ids=None, screen_names=None): return self._lookup_users(list_to_csv(user_ids), list_to_csv(screen_names)) _lookup_users = bind_api( path = '/users/lookup.json', payload_type = 'user', payload_list = True, allowed_param = ['user_id', 'screen_name'], require_auth = True ) """ Get the authenticated user """ def me(self): return self.get_user(screen_name=self.auth.get_username()) """ users/search """ search_users = bind_api( path = '/users/search.json', payload_type = 'user', payload_list = True, require_auth = True, allowed_param = ['q', 'per_page', 'page'] ) """ statuses/friends """ friends = bind_api( path = '/statuses/friends.json', payload_type = 'user', payload_list = True, allowed_param = ['id', 'user_id', 'screen_name', 'page', 'cursor'] ) """ statuses/followers """ followers = bind_api( path = '/statuses/followers.json', payload_type = 'user', payload_list = True, allowed_param = ['id', 'user_id', 'screen_name', 'page', 'cursor'] ) """ direct_messages """ direct_messages = bind_api( path = '/direct_messages.json', payload_type = 'direct_message', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ direct_messages/sent """ sent_direct_messages = bind_api( path = '/direct_messages/sent.json', payload_type = 'direct_message', payload_list = True, allowed_param = ['since_id', 'max_id', 'count', 'page'], require_auth = True ) """ direct_messages/new """ send_direct_message = bind_api( path = '/direct_messages/new.json', method = 'POST', payload_type = 'direct_message', allowed_param = ['user', 'screen_name', 'user_id', 'text'], require_auth = True ) """ direct_messages/destroy """ destroy_direct_message = bind_api( path = '/direct_messages/destroy.json', method = 'DELETE', payload_type = 'direct_message', allowed_param = ['id'], require_auth = True ) """ friendships/create """ create_friendship = bind_api( path = '/friendships/create.json', method = 'POST', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name', 'follow'], require_auth = True ) """ friendships/destroy """ destroy_friendship = bind_api( path = '/friendships/destroy.json', method = 'DELETE', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True ) """ friendships/exists """ exists_friendship = bind_api( path = '/friendships/exists.json', payload_type = 'json', allowed_param = ['user_a', 'user_b'] ) """ friendships/show """ show_friendship = bind_api( path = '/friendships/show.json', payload_type = 'friendship', allowed_param = ['source_id', 'source_screen_name', 'target_id', 'target_screen_name'] ) """ friends/ids """ friends_ids = bind_api( path = '/friends/ids.json', payload_type = 'ids', allowed_param = ['id', 'user_id', 'screen_name', 'cursor'] ) """ friendships/incoming """ friendships_incoming = bind_api( path = '/friendships/incoming.json', payload_type = 'ids', allowed_param = ['cursor'] ) """ friendships/outgoing""" friendships_outgoing = bind_api( path = '/friendships/outgoing.json', payload_type = 'ids', allowed_param = ['cursor'] ) """ followers/ids """ followers_ids = bind_api( path = '/followers/ids.json', payload_type = 'ids', allowed_param = ['id', 'user_id', 'screen_name', 'cursor'] ) """ account/verify_credentials """ def verify_credentials(self): try: return bind_api( path = '/account/verify_credentials.json', payload_type = 'user', require_auth = True )(self) except TweepError: return False """ account/rate_limit_status """ rate_limit_status = bind_api( path = '/account/rate_limit_status.json', payload_type = 'json' ) """ account/update_delivery_device """ set_delivery_device = bind_api( path = '/account/update_delivery_device.json', method = 'POST', allowed_param = ['device'], payload_type = 'user', require_auth = True ) """ account/update_profile_colors """ update_profile_colors = bind_api( path = '/account/update_profile_colors.json', method = 'POST', payload_type = 'user', allowed_param = ['profile_background_color', 'profile_text_color', 'profile_link_color', 'profile_sidebar_fill_color', 'profile_sidebar_border_color'], require_auth = True ) """ account/update_profile_image """ def update_profile_image(self, filename): headers, post_data = API._pack_image(filename, 700) return bind_api( path = '/account/update_profile_image.json', method = 'POST', payload_type = 'user', require_auth = True )(self, post_data=post_data, headers=headers) """ account/update_profile_background_image """ def update_profile_background_image(self, filename, *args, **kargs): headers, post_data = API._pack_image(filename, 800) bind_api( path = '/account/update_profile_background_image.json', method = 'POST', payload_type = 'user', allowed_param = ['tile'], require_auth = True )(self, post_data=post_data, headers=headers) """ account/update_profile """ update_profile = bind_api( path = '/account/update_profile.json', method = 'POST', payload_type = 'user', allowed_param = ['name', 'url', 'location', 'description'], require_auth = True ) """ favorites """ favorites = bind_api( path = '/favorites.json', payload_type = 'status', payload_list = True, allowed_param = ['id', 'page'] ) """ favorites/create """ create_favorite = bind_api( path = '/favorites/create/{id}.json', method = 'POST', payload_type = 'status', allowed_param = ['id'], require_auth = True ) """ favorites/destroy """ destroy_favorite = bind_api( path = '/favorites/destroy/{id}.json', method = 'DELETE', payload_type = 'status', allowed_param = ['id'], require_auth = True ) """ notifications/follow """ enable_notifications = bind_api( path = '/notifications/follow.json', method = 'POST', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True ) """ notifications/leave """ disable_notifications = bind_api( path = '/notifications/leave.json', method = 'POST', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True ) """ blocks/create """ create_block = bind_api( path = '/blocks/create.json', method = 'POST', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True ) """ blocks/destroy """ destroy_block = bind_api( path = '/blocks/destroy.json', method = 'DELETE', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True ) """ blocks/exists """ def exists_block(self, *args, **kargs): try: bind_api( path = '/blocks/exists.json', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True )(self, *args, **kargs) except TweepError: return False return True """ blocks/blocking """ blocks = bind_api( path = '/blocks/blocking.json', payload_type = 'user', payload_list = True, allowed_param = ['page'], require_auth = True ) """ blocks/blocking/ids """ blocks_ids = bind_api( path = '/blocks/blocking/ids.json', payload_type = 'json', require_auth = True ) """ report_spam """ report_spam = bind_api( path = '/report_spam.json', method = 'POST', payload_type = 'user', allowed_param = ['id', 'user_id', 'screen_name'], require_auth = True ) """ saved_searches """ saved_searches = bind_api( path = '/saved_searches.json', payload_type = 'saved_search', payload_list = True, require_auth = True ) """ saved_searches/show """ get_saved_search = bind_api( path = '/saved_searches/show/{id}.json', payload_type = 'saved_search', allowed_param = ['id'], require_auth = True ) """ saved_searches/create """ create_saved_search = bind_api( path = '/saved_searches/create.json', method = 'POST', payload_type = 'saved_search', allowed_param = ['query'], require_auth = True ) """ saved_searches/destroy """ destroy_saved_search = bind_api( path = '/saved_searches/destroy/{id}.json', method = 'DELETE', payload_type = 'saved_search', allowed_param = ['id'], require_auth = True ) """ help/test """ def test(self): try: bind_api( path = '/help/test.json', )(self) except TweepError: return False return True def create_list(self, *args, **kargs): return bind_api( path = '/%s/lists.json' % self.auth.get_username(), method = 'POST', payload_type = 'list', allowed_param = ['name', 'mode', 'description'], require_auth = True )(self, *args, **kargs) def destroy_list(self, slug): return bind_api( path = '/%s/lists/%s.json' % (self.auth.get_username(), slug), method = 'DELETE', payload_type = 'list', require_auth = True )(self) def update_list(self, slug, *args, **kargs): return bind_api( path = '/%s/lists/%s.json' % (self.auth.get_username(), slug), method = 'POST', payload_type = 'list', allowed_param = ['name', 'mode', 'description'], require_auth = True )(self, *args, **kargs) lists = bind_api( path = '/{user}/lists.json', payload_type = 'list', payload_list = True, allowed_param = ['user', 'cursor'], require_auth = True ) lists_memberships = bind_api( path = '/{user}/lists/memberships.json', payload_type = 'list', payload_list = True, allowed_param = ['user', 'cursor'], require_auth = True ) lists_subscriptions = bind_api( path = '/{user}/lists/subscriptions.json', payload_type = 'list', payload_list = True, allowed_param = ['user', 'cursor'], require_auth = True ) list_timeline = bind_api( path = '/{owner}/lists/{slug}/statuses.json', payload_type = 'status', payload_list = True, allowed_param = ['owner', 'slug', 'since_id', 'max_id', 'per_page', 'page'] ) get_list = bind_api( path = '/{owner}/lists/{slug}.json', payload_type = 'list', allowed_param = ['owner', 'slug'] ) def add_list_member(self, slug, *args, **kargs): return bind_api( path = '/%s/%s/members.json' % (self.auth.get_username(), slug), method = 'POST', payload_type = 'list', allowed_param = ['id'], require_auth = True )(self, *args, **kargs) def remove_list_member(self, slug, *args, **kargs): return bind_api( path = '/%s/%s/members.json' % (self.auth.get_username(), slug), method = 'DELETE', payload_type = 'list', allowed_param = ['id'], require_auth = True )(self, *args, **kargs) list_members = bind_api( path = '/{owner}/{slug}/members.json', payload_type = 'user', payload_list = True, allowed_param = ['owner', 'slug', 'cursor'] ) def is_list_member(self, owner, slug, user_id): try: return bind_api( path = '/%s/%s/members/%s.json' % (owner, slug, user_id), payload_type = 'user' )(self) except TweepError: return False subscribe_list = bind_api( path = '/{owner}/{slug}/subscribers.json', method = 'POST', payload_type = 'list', allowed_param = ['owner', 'slug'], require_auth = True ) unsubscribe_list = bind_api( path = '/{owner}/{slug}/subscribers.json', method = 'DELETE', payload_type = 'list', allowed_param = ['owner', 'slug'], require_auth = True ) list_subscribers = bind_api( path = '/{owner}/{slug}/subscribers.json', payload_type = 'user', payload_list = True, allowed_param = ['owner', 'slug', 'cursor'] ) def is_subscribed_list(self, owner, slug, user_id): try: return bind_api( path = '/%s/%s/subscribers/%s.json' % (owner, slug, user_id), payload_type = 'user' )(self) except TweepError: return False """ trends/available """ trends_available = bind_api( path = '/trends/available.json', payload_type = 'json', allowed_param = ['lat', 'long'] ) """ trends/location """ trends_location = bind_api( path = '/trends/{woeid}.json', payload_type = 'json', allowed_param = ['woeid'] ) """ search """ search = bind_api( search_api = True, path = '/search.json', payload_type = 'search_result', payload_list = True, allowed_param = ['q', 'lang', 'locale', 'rpp', 'page', 'since_id', 'geocode', 'show_user', 'max_id', 'since', 'until', 'result_type'] ) search.pagination_mode = 'page' """ trends """ trends = bind_api( path = '/trends.json', payload_type = 'json' ) """ trends/current """ trends_current = bind_api( path = '/trends/current.json', payload_type = 'json', allowed_param = ['exclude'] ) """ trends/daily """ trends_daily = bind_api( path = '/trends/daily.json', payload_type = 'json', allowed_param = ['date', 'exclude'] ) """ trends/weekly """ trends_weekly = bind_api( path = '/trends/weekly.json', payload_type = 'json', allowed_param = ['date', 'exclude'] ) """ geo/reverse_geocode """ reverse_geocode = bind_api( path = '/geo/reverse_geocode.json', payload_type = 'json', allowed_param = ['lat', 'long', 'accuracy', 'granularity', 'max_results'] ) """ geo/nearby_places """ nearby_places = bind_api( path = '/geo/nearby_places.json', payload_type = 'json', allowed_param = ['lat', 'long', 'ip', 'accuracy', 'granularity', 'max_results'] ) """ geo/id """ geo_id = bind_api( path = '/geo/id/{id}.json', payload_type = 'json', allowed_param = ['id'] ) """ Internal use only """ @staticmethod def _pack_image(filename, max_size): """Pack image from file into multipart-formdata post body""" # image must be less than 700kb in size try: if os.path.getsize(filename) > (max_size * 1024): raise TweepError('File is too big, must be less than 700kb.') except os.error, e: raise TweepError('Unable to access file') # image must be gif, jpeg, or png file_type = mimetypes.guess_type(filename) if file_type is None: raise TweepError('Could not determine file type') file_type = file_type[0] if file_type not in ['image/gif', 'image/jpeg', 'image/png']: raise TweepError('Invalid file type for image: %s' % file_type) # build the mulitpart-formdata body fp = open(filename, 'rb') BOUNDARY = 'Tw3ePy' body = [] body.append('--' + BOUNDARY) body.append('Content-Disposition: form-data; name="image"; filename="%s"' % filename) body.append('Content-Type: %s' % file_type) body.append('') body.append(fp.read()) body.append('--' + BOUNDARY + '--') body.append('') fp.close() body = '\r\n'.join(body) # build headers headers = { 'Content-Type': 'multipart/form-data; boundary=Tw3ePy', 'Content-Length': len(body) } return headers, body class AuthHandler(object): def apply_auth(self, url, method, headers, parameters): """Apply authentication headers to request""" raise NotImplementedError def get_username(self): """Return the username of the authenticated user""" raise NotImplementedError class OAuthConsumer(object): """Consumer of OAuth authentication. OAuthConsumer is a data type that represents the identity of the Consumer via its shared secret with the Service Provider. """ key = None secret = None def __init__(self, key, secret): self.key = key self.secret = secret class OAuthSignatureMethod(object): """A strategy class that implements a signature method.""" def get_name(self): """-> str.""" raise NotImplementedError def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token): """-> str key, str raw.""" raise NotImplementedError def build_signature(self, oauth_request, oauth_consumer, oauth_token): """-> str.""" raise NotImplementedError def check_signature(self, oauth_request, consumer, token, signature): built = self.build_signature(oauth_request, consumer, token) return built == signature class OAuthToken(object): """OAuthToken is a data type that represents an End User via either an access or request token. key -- the token secret -- the token secret """ key = None secret = None callback = None callback_confirmed = None verifier = None def __init__(self, key, secret): self.key = key self.secret = secret def set_callback(self, callback): self.callback = callback self.callback_confirmed = 'true' def set_verifier(self, verifier=None): if verifier is not None: self.verifier = verifier else: self.verifier = generate_verifier() def get_callback_url(self): if self.callback and self.verifier: # Append the oauth_verifier. parts = urlparse.urlparse(self.callback) scheme, netloc, path, params, query, fragment = parts[:6] if query: query = '%s&oauth_verifier=%s' % (query, self.verifier) else: query = 'oauth_verifier=%s' % self.verifier return urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) return self.callback def to_string(self): data = { 'oauth_token': self.key, 'oauth_token_secret': self.secret, } if self.callback_confirmed is not None: data['oauth_callback_confirmed'] = self.callback_confirmed return urllib.urlencode(data) def from_string(s): """ Returns a token from something like: oauth_token_secret=xxx&oauth_token=xxx """ params = cgi.parse_qs(s, keep_blank_values=False) key = params['oauth_token'][0] secret = params['oauth_token_secret'][0] token = OAuthToken(key, secret) try: token.callback_confirmed = params['oauth_callback_confirmed'][0] except KeyError: pass # 1.0, no callback confirmed. return token from_string = staticmethod(from_string) def __str__(self): return self.to_string() class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod): def get_name(self): return 'HMAC-SHA1' def build_signature_base_string(self, oauth_request, consumer, token): sig = ( escape(oauth_request.get_normalized_http_method()), escape(oauth_request.get_normalized_http_url()), escape(oauth_request.get_normalized_parameters()), ) key = '%s&' % escape(consumer.secret) if token: key += escape(token.secret) raw = '&'.join(sig) return key, raw def build_signature(self, oauth_request, consumer, token): """Builds the base signature string.""" key, raw = self.build_signature_base_string(oauth_request, consumer, token) # HMAC object. try: import hashlib # 2.5 hashed = hmac.new(key, raw, hashlib.sha1) except: import sha # Deprecated hashed = hmac.new(key, raw, sha) # Calculate the digest base 64. return binascii.b2a_base64(hashed.digest())[:-1] class OAuthRequest(object): """OAuthRequest represents the request and can be serialized. OAuth parameters: - oauth_consumer_key - oauth_token - oauth_signature_method - oauth_signature - oauth_timestamp - oauth_nonce - oauth_version - oauth_verifier ... any additional parameters, as defined by the Service Provider. """ parameters = None # OAuth parameters. http_method = HTTP_METHOD http_url = None version = VERSION def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None): self.http_method = http_method self.http_url = http_url self.parameters = parameters or {} def set_parameter(self, parameter, value): self.parameters[parameter] = value def get_parameter(self, parameter): try: return self.parameters[parameter] except: raise OAuthError('Parameter not found: %s' % parameter) def _get_timestamp_nonce(self): return self.get_parameter('oauth_timestamp'), self.get_parameter( 'oauth_nonce') def get_nonoauth_parameters(self): """Get any non-OAuth parameters.""" parameters = {} for k, v in self.parameters.iteritems(): # Ignore oauth parameters. if k.find('oauth_') < 0: parameters[k] = v return parameters def to_header(self, realm=''): """Serialize as a header for an HTTPAuth request.""" auth_header = 'OAuth realm="%s"' % realm # Add the oauth parameters. if self.parameters: for k, v in self.parameters.iteritems(): if k[:6] == 'oauth_': auth_header += ', %s="%s"' % (k, escape(str(v))) return {'Authorization': auth_header} def to_postdata(self): """Serialize as post data for a POST request.""" return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) \ for k, v in self.parameters.iteritems()]) def to_url(self): """Serialize as a URL for a GET request.""" return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata()) def get_normalized_parameters(self): """Return a string that contains the parameters that must be signed.""" params = self.parameters try: # Exclude the signature if it exists. del params['oauth_signature'] except: pass # Escape key values before sorting. key_values = [(escape(_utf8_str(k)), escape(_utf8_str(v))) \ for k,v in params.items()] # Sort lexicographically, first after key, then after value. key_values.sort() # Combine key value pairs into a string. return '&'.join(['%s=%s' % (k, v) for k, v in key_values]) def get_normalized_http_method(self): """Uppercases the http method.""" return self.http_method.upper() def get_normalized_http_url(self): """Parses the URL and rebuilds it to be scheme://host/path.""" parts = urlparse.urlparse(self.http_url) scheme, netloc, path = parts[:3] # Exclude default port numbers. if scheme == 'http' and netloc[-3:] == ':80': netloc = netloc[:-3] elif scheme == 'https' and netloc[-4:] == ':443': netloc = netloc[:-4] return '%s://%s%s' % (scheme, netloc, path) def sign_request(self, signature_method, consumer, token): """Set the signature parameter to the result of build_signature.""" # Set the signature method. self.set_parameter('oauth_signature_method', signature_method.get_name()) # Set the signature. self.set_parameter('oauth_signature', self.build_signature(signature_method, consumer, token)) def build_signature(self, signature_method, consumer, token): """Calls the build signature method within the signature method.""" return signature_method.build_signature(self, consumer, token) def from_request(http_method, http_url, headers=None, parameters=None, query_string=None): """Combines multiple parameter sources.""" if parameters is None: parameters = {} # Headers if headers and 'Authorization' in headers: auth_header = headers['Authorization'] # Check that the authorization header is OAuth. if auth_header[:6] == 'OAuth ': auth_header = auth_header[6:] try: # Get the parameters from the header. header_params = OAuthRequest._split_header(auth_header) parameters.update(header_params) except: raise OAuthError('Unable to parse OAuth parameters from ' 'Authorization header.') # GET or POST query string. if query_string: query_params = OAuthRequest._split_url_string(query_string) parameters.update(query_params) # URL parameters. param_str = urlparse.urlparse(http_url)[4] # query url_params = OAuthRequest._split_url_string(param_str) parameters.update(url_params) if parameters: return OAuthRequest(http_method, http_url, parameters) return None from_request = staticmethod(from_request) def from_consumer_and_token(oauth_consumer, token=None, callback=None, verifier=None, http_method=HTTP_METHOD, http_url=None, parameters=None): if not parameters: parameters = {} defaults = { 'oauth_consumer_key': oauth_consumer.key, 'oauth_timestamp': generate_timestamp(), 'oauth_nonce': generate_nonce(), 'oauth_version': OAuthRequest.version, } defaults.update(parameters) parameters = defaults if token: parameters['oauth_token'] = token.key if token.callback: parameters['oauth_callback'] = token.callback # 1.0a support for verifier. if verifier: parameters['oauth_verifier'] = verifier elif callback: # 1.0a support for callback in the request token request. parameters['oauth_callback'] = callback return OAuthRequest(http_method, http_url, parameters) from_consumer_and_token = staticmethod(from_consumer_and_token) def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD, http_url=None, parameters=None): if not parameters: parameters = {} parameters['oauth_token'] = token.key if callback: parameters['oauth_callback'] = callback return OAuthRequest(http_method, http_url, parameters) from_token_and_callback = staticmethod(from_token_and_callback) def _split_header(header): """Turn Authorization: header into parameters.""" params = {} parts = header.split(',') for param in parts: # Ignore realm parameter. if param.find('realm') > -1: continue # Remove whitespace. param = param.strip() # Split key-value. param_parts = param.split('=', 1) # Remove quotes and unescape the value. params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"')) return params _split_header = staticmethod(_split_header) def _split_url_string(param_str): """Turn URL string into parameters.""" parameters = cgi.parse_qs(param_str, keep_blank_values=False) for k, v in parameters.iteritems(): parameters[k] = urllib.unquote(v[0]) return parameters _split_url_string = staticmethod(_split_url_string) class OAuthHandler(AuthHandler): """OAuth authentication handler""" OAUTH_HOST = 'twitter.com' OAUTH_ROOT = '/oauth/' def __init__(self, consumer_key, consumer_secret, callback=None, secure=False): self._consumer = OAuthConsumer(consumer_key, consumer_secret) self._sigmethod = OAuthSignatureMethod_HMAC_SHA1() self.request_token = None self.access_token = None self.callback = callback self.username = None self.secure = secure def _get_oauth_url(self, endpoint, secure=False): if self.secure or secure: prefix = 'https://' else: prefix = 'http://' return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint def apply_auth(self, url, method, headers, parameters): request = OAuthRequest.from_consumer_and_token( self._consumer, http_url=url, http_method=method, token=self.access_token, parameters=parameters ) request.sign_request(self._sigmethod, self._consumer, self.access_token) headers.update(request.to_header()) def _get_request_token(self): try: url = self._get_oauth_url('request_token') request = OAuthRequest.from_consumer_and_token( self._consumer, http_url=url, callback=self.callback ) request.sign_request(self._sigmethod, self._consumer, None) resp = urlopen(Request(url, headers=request.to_header())) return OAuthToken.from_string(resp.read()) except Exception, e: raise TweepError(e) def set_request_token(self, key, secret): self.request_token = OAuthToken(key, secret) def set_access_token(self, key, secret): self.access_token = OAuthToken(key, secret) def get_authorization_url(self, signin_with_twitter=False): """Get the authorization URL to redirect the user""" try: # get the request token self.request_token = self._get_request_token() # build auth request and return as url if signin_with_twitter: url = self._get_oauth_url('authenticate') else: url = self._get_oauth_url('authorize') request = OAuthRequest.from_token_and_callback( token=self.request_token, http_url=url ) return request.to_url() except Exception, e: raise TweepError(e) def get_access_token(self, verifier=None): """ After user has authorized the request token, get access token with user supplied verifier. """ try: url = self._get_oauth_url('access_token') # build request request = OAuthRequest.from_consumer_and_token( self._consumer, token=self.request_token, http_url=url, verifier=str(verifier) ) request.sign_request(self._sigmethod, self._consumer, self.request_token) # send request resp = urlopen(Request(url, headers=request.to_header())) self.access_token = OAuthToken.from_string(resp.read()) return self.access_token except Exception, e: raise TweepError(e) def get_xauth_access_token(self, username, password): """ Get an access token from an username and password combination. In order to get this working you need to create an app at http://twitter.com/apps, after that send a mail to api@twitter.com and request activation of xAuth for it. """ try: url = self._get_oauth_url('access_token', secure=True) # must use HTTPS request = OAuthRequest.from_consumer_and_token( oauth_consumer=self._consumer, http_method='POST', http_url=url, parameters = { 'x_auth_mode': 'client_auth', 'x_auth_username': username, 'x_auth_password': password } ) request.sign_request(self._sigmethod, self._consumer, None) resp = urlopen(Request(url, data=request.to_postdata())) self.access_token = OAuthToken.from_string(resp.read()) return self.access_token except Exception, e: raise TweepError(e) def get_username(self): if self.username is None: api = API(self) user = api.verify_credentials() if user: self.username = user.screen_name else: raise TweepError("Unable to get username, invalid oauth token!") return self.username CONSUMER_KEY = '' CONSUMER_SECRET = '' ACCESS_KEY = '' ACCESS_SECRET = '' auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_KEY, ACCESS_SECRET) api = API(auth) #use api.update_status() to update your status