以前、Google App Engine上で使える便利クラスを作りましたがツイートする際にstatusesUpdateメソッドにaccess_token_secretとaccess_tokenを設定する必要があったので下のようにinitのタイミングだけ設定すればおkな形に修正をしました。
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2007 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from google.appengine.api import urlfetch import datetime import time import random import urllib import urllib2 import hmac import hashlib import json import logging class Twitter(object): def __init__(self,consumer_secret,consumer_key,access_token_secret=None,access_token=None): self.consumer_secret = consumer_secret self.access_token_secret = "" if access_token_secret is not None: self.access_token_secret = access_token_secret self.request_token_url = "https://api.twitter.com/oauth/request_token" self.oauth_authenticate_url = "https://api.twitter.com/oauth/authenticate?oauth_token=" self.access_token_url = "https://api.twitter.com/oauth/access_token" self.statuses_update_url = "https://api.twitter.com/1.1/statuses/update.json" self.media_upload_url = "https://upload.twitter.com/1.1/media/upload.json" self.help_configuration_url = "https://api.twitter.com/1.1/help/configuration.json" self.prms = { "oauth_consumer_key":consumer_key, "oauth_nonce":Twitter._nonce(), "oauth_signature_method":'HMAC-SHA1', "oauth_version":'1.0', "oauth_timestamp":Twitter._timeStamp() } if access_token is not None: self.prms["oauth_token"] = access_token @classmethod def _timeStamp(cls): _d = datetime.datetime.today() _d = time.mktime(_d.timetuple()) _d = str(int(_d)) return _d @classmethod def _nonce(cls): _s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789[]{}!$%&'()-^\:;*+><" _l = list(_s) _n = "" for i in range(20): _x = random.randint(0,len(_l)-1) _n += _l[_x] return _n def _signature(self,request_url,method): _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "&" + i + "=" + urllib.quote(self.prms[i],"") else: _prms = _prms[1:] _prms = urllib.quote_plus(method) + "&" + urllib.quote_plus(request_url) + "&" + urllib.quote_plus(_prms) _h = hmac.new("%s&%s" % (urllib.quote_plus(self.consumer_secret), urllib.quote_plus(self.access_token_secret)), _prms, hashlib.sha1) _sig = _h.digest().encode("base64").strip() return _sig def getRedirectOauthAuthenticateUrl(self): if self.prms.has_key("oauth_token"): _url = self.oauth_authenticate_url + self.prms["oauth_token"] return _url else: return None def requestAccessToken(self,oauth_token,oauth_verifier): self.prms["oauth_token"] = oauth_token _response = self._request(self.access_token_url,{"oauth_verifier":oauth_verifier},"POST") if _response: _response = _response.split("&") _result = {} for i in _response: key_value = i.split("=") _result[key_value[0]] = key_value[1] return _result else: return _response def requestOAuthToken(self,oauth_callback_url): self.prms["oauth_callback"] = oauth_callback_url _response = self._request(self.request_token_url,{"oauth_callback":self.prms["oauth_callback"]},"POST") if _response is None: return False _response = _response.split("&") _result = {} for i in _response: key_value = i.split("=") _result[key_value[0]] = key_value[1] if _result["oauth_callback_confirmed"] == "true": self.prms["oauth_token"] = _result["oauth_token"] return True else: return False def _request(self,request_url,request_prms,method): self.prms['oauth_signature'] = self._signature(request_url,method) _headers = { 'Content-Type': "application/x-www-form-urlencoded" } _urlfetch_method = None if method == 'POST': _urlfetch_method = urlfetch.POST else: _urlfetch_method = urlfetch.GET _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "," + i + "=\"" + urllib.quote(self.prms[i],"") + "\"" else: _prms = _prms[1:] _prms = "OAuth " + _prms _payload = "" for i in request_prms: _payload = _payload + "&" + i + "=" + urllib.quote(request_prms[i],"") else: _payload = _payload[1:] _headers["Authorization"] = _prms _result = urlfetch.fetch( url = request_url, payload = _payload, method = _urlfetch_method, headers = _headers ) if _result.status_code == 200: return _result.content else: logging.info(_result.status_code) logging.info(_result.content) return None def helpConfiguration(self): _prms = {} _response = self._request(self.help_configuration_url,_prms,"GET") if _response is not None: _response = json.loads(_response) return _response def statusesUpdate(self,status,media_ids=""): self.prms["status"] = status _prms = {} _prms["status"] = status if media_ids != "": self.prms["media_ids"] = media_ids _prms["media_ids"] = media_ids _response = self._request(self.statuses_update_url,_prms,"POST") return _response
0 コメント:
コメントを投稿