前回、昔、書いた[Google App Engine][twitter][oauth]ログインを自作するから少し仕様が変わったみたいで、少し見やすくして、新しくクラスを作ってみたのですが、画像付きツイートするメソッドを追加しました。
#twitter.py
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2007 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from google.appengine.api import urlfetch import datetime import time import random import urllib import urllib2 import hmac import hashlib import logging import json class Twitter(object): def __init__(self,consumer_secret,consumer_key): self.consumer_secret = consumer_secret self.access_token_secret = "" self.method = "POST" self.request_token_url = "https://api.twitter.com/oauth/request_token" self.oauth_authenticate_url = "https://api.twitter.com/oauth/authenticate?oauth_token=" self.access_token_url = "https://api.twitter.com/oauth/access_token" self.statuses_update_url = "https://api.twitter.com/1.1/statuses/update.json" self.media_upload_url = "https://upload.twitter.com/1.1/media/upload.json" self.prms = { "oauth_consumer_key":consumer_key, "oauth_nonce":Twitter._nonce(), "oauth_signature_method":'HMAC-SHA1', "oauth_version":'1.0', "oauth_timestamp":Twitter._timeStamp() } @classmethod def _timeStamp(cls): _d = datetime.datetime.today() _d = time.mktime(_d.timetuple()) _d = str(int(_d)) return _d @classmethod def _nonce(cls): _s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789[]{}!$%&'()-^\:;*+><" _l = list(_s) _n = "" for i in range(20): _x = random.randint(0,len(_l)-1) _n += _l[_x] return _n def _signature(self,request_url): _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "&" + i + "=" + urllib.quote(self.prms[i],"") else: _prms = _prms[1:] _prms = urllib.quote_plus(self.method) + "&" + urllib.quote_plus(request_url) + "&" + urllib.quote_plus(_prms) _h = hmac.new("%s&%s" % (urllib.quote_plus(self.consumer_secret), urllib.quote_plus(self.access_token_secret)), _prms, hashlib.sha1) _sig = _h.digest().encode("base64").strip() return _sig def getOauthToken(self): if self.prms.has_key("oauth_token"): return self.prms["oauth_token"] else: return None def getRedirectOauthAuthenticateUrl(self): if self.prms.has_key("oauth_token"): _url = self.oauth_authenticate_url + self.prms["oauth_token"] return _url else: return None def requestAccessToken(self,oauth_token,oauth_verifier): self.prms["oauth_token"] = oauth_token _response = self._request(self.access_token_url,{"oauth_verifier":oauth_verifier}) if _response: _response = _response.split("&") _result = {} for i in _response: key_value = i.split("=") _result[key_value[0]] = key_value[1] return _result else: return _response #logging.info(_response) def requestOAuthToken(self,oauth_callback_url): self.prms["oauth_callback"] = oauth_callback_url _response = self._request(self.request_token_url,{"oauth_callback":self.prms["oauth_callback"]}) if _response is None: return False _response = _response.split("&") _result = {} for i in _response: key_value = i.split("=") _result[key_value[0]] = key_value[1] if _result["oauth_callback_confirmed"] == "true": self.prms["oauth_token"] = _result["oauth_token"] # #oauth_token = _response["oauth_token"] return True else: return False def _request(self,request_url,request_prms): self.prms['oauth_signature'] = self._signature(request_url) if self.method == 'POST': _headers = { 'Content-Type': "application/x-www-form-urlencoded" } #logging.info(urllib.urlencode({"oauth_callback":self.prms["oauth_callback"]})) _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "," + i + "=\"" + urllib.quote(self.prms[i],"") + "\"" else: _prms = _prms[1:] _prms = "OAuth " + _prms payload = "" for i in request_prms: payload = payload + "&" + i + "=" + urllib.quote(request_prms[i],"") else: payload = payload[1:] logging.info(_prms) logging.info(payload) logging.info(urllib.urlencode(request_prms)) _headers["Authorization"] = _prms _result = urlfetch.fetch( url = request_url, payload = payload, method = urlfetch.POST, headers = _headers ) logging.info(_result.status_code) logging.info(_result.content) if _result.status_code == 200: return _result.content else: return None def _boundary(self): _s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890" _l = list(_s) _n = "" for i in range(15): _x = random.randint(0,len(_l)-1) _n += _l[_x] _n = "----------" + _n return _n def mediaUpload(self,access_token,access_token_secret,media): #status = u"test" #status = status.encode('utf-8') #_response = self.statusesUpdate(access_token,access_token_secret,status) _request_url = self.media_upload_url self.prms["oauth_token"] = access_token self.access_token_secret = access_token_secret self.prms['oauth_signature'] = self._signature(_request_url) _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "," + i + "=\"" + urllib.quote(self.prms[i],"") + "\"" else: _prms = _prms[1:] _prms = "OAuth " + _prms _boundary = self._boundary() _request = urllib2.Request(_request_url) _request.add_header("Authorization", _prms) _request.add_header("Content-Type", "multipart/form-data; boundary=%s" % _boundary) _data = [] _data.append("--"+_boundary) _data.append("Content-Disposition: form-data; name=\"media\";") _data.append("") _data.append(media) _data.append("--" + _boundary + "--") _data.append("") _result = urllib2.urlopen(_request,"\r\n".join(_data)) _response = _result.read() json_dict = json.loads(_response) #logging.info(json_dict) #logging.info(json_dict["media_id"]) del self.prms['oauth_signature'] status = u" " status = status.encode('utf-8') _response = self.statusesUpdate(access_token,access_token_secret,status,json_dict["media_id_string"]) return _response def statusesUpdate(self,access_token,access_token_secret,status,media_ids=""): self.prms["oauth_token"] = access_token self.access_token_secret = access_token_secret self.prms["status"] = status _prms = {} _prms["status"] = status if media_ids != "": self.prms["media_ids"] = media_ids _prms["media_ids"] = media_ids #logging.info(self.access_token_secret) _response = self._request(self.statuses_update_url,_prms) return _response
0 コメント:
コメントを投稿