前回、昔、書いた[Google App Engine][twitter][oauth]ログインを自作するから少し仕様が変わったみたいで、少し見やすくして、新しくクラスを作ってみたのですが、画像付きツイートするメソッドを追加しました。
#twitter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from google.appengine.api import urlfetch
import datetime
import time
import random
import urllib
import urllib2
import hmac
import hashlib
import logging
import json
class Twitter(object):
def __init__(self,consumer_secret,consumer_key):
self.consumer_secret = consumer_secret
self.access_token_secret = ""
self.method = "POST"
self.request_token_url = "https://api.twitter.com/oauth/request_token"
self.oauth_authenticate_url = "https://api.twitter.com/oauth/authenticate?oauth_token="
self.access_token_url = "https://api.twitter.com/oauth/access_token"
self.statuses_update_url = "https://api.twitter.com/1.1/statuses/update.json"
self.media_upload_url = "https://upload.twitter.com/1.1/media/upload.json"
self.prms = {
"oauth_consumer_key":consumer_key,
"oauth_nonce":Twitter._nonce(),
"oauth_signature_method":'HMAC-SHA1',
"oauth_version":'1.0',
"oauth_timestamp":Twitter._timeStamp()
}
@classmethod
def _timeStamp(cls):
_d = datetime.datetime.today()
_d = time.mktime(_d.timetuple())
_d = str(int(_d))
return _d
@classmethod
def _nonce(cls):
_s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789[]{}!$%&'()-^\:;*+><"
_l = list(_s)
_n = ""
for i in range(20):
_x = random.randint(0,len(_l)-1)
_n += _l[_x]
return _n
def _signature(self,request_url):
_prms_keys = sorted(self.prms.keys())
_prms = ''
for i in _prms_keys:
_prms = _prms + "&" + i + "=" + urllib.quote(self.prms[i],"")
else:
_prms = _prms[1:]
_prms = urllib.quote_plus(self.method) + "&" + urllib.quote_plus(request_url) + "&" + urllib.quote_plus(_prms)
_h = hmac.new("%s&%s" % (urllib.quote_plus(self.consumer_secret), urllib.quote_plus(self.access_token_secret)), _prms, hashlib.sha1)
_sig = _h.digest().encode("base64").strip()
return _sig
def getOauthToken(self):
if self.prms.has_key("oauth_token"):
return self.prms["oauth_token"]
else:
return None
def getRedirectOauthAuthenticateUrl(self):
if self.prms.has_key("oauth_token"):
_url = self.oauth_authenticate_url + self.prms["oauth_token"]
return _url
else:
return None
def requestAccessToken(self,oauth_token,oauth_verifier):
self.prms["oauth_token"] = oauth_token
_response = self._request(self.access_token_url,{"oauth_verifier":oauth_verifier})
if _response:
_response = _response.split("&")
_result = {}
for i in _response:
key_value = i.split("=")
_result[key_value[0]] = key_value[1]
return _result
else:
return _response
#logging.info(_response)
def requestOAuthToken(self,oauth_callback_url):
self.prms["oauth_callback"] = oauth_callback_url
_response = self._request(self.request_token_url,{"oauth_callback":self.prms["oauth_callback"]})
if _response is None:
return False
_response = _response.split("&")
_result = {}
for i in _response:
key_value = i.split("=")
_result[key_value[0]] = key_value[1]
if _result["oauth_callback_confirmed"] == "true":
self.prms["oauth_token"] = _result["oauth_token"]
# #oauth_token = _response["oauth_token"]
return True
else:
return False
def _request(self,request_url,request_prms):
self.prms['oauth_signature'] = self._signature(request_url)
if self.method == 'POST':
_headers = {
'Content-Type': "application/x-www-form-urlencoded"
}
#logging.info(urllib.urlencode({"oauth_callback":self.prms["oauth_callback"]}))
_prms_keys = sorted(self.prms.keys())
_prms = ''
for i in _prms_keys:
_prms = _prms + "," + i + "=\"" + urllib.quote(self.prms[i],"") + "\""
else:
_prms = _prms[1:]
_prms = "OAuth " + _prms
payload = ""
for i in request_prms:
payload = payload + "&" + i + "=" + urllib.quote(request_prms[i],"")
else:
payload = payload[1:]
logging.info(_prms)
logging.info(payload)
logging.info(urllib.urlencode(request_prms))
_headers["Authorization"] = _prms
_result = urlfetch.fetch(
url = request_url,
payload = payload,
method = urlfetch.POST,
headers = _headers
)
logging.info(_result.status_code)
logging.info(_result.content)
if _result.status_code == 200:
return _result.content
else:
return None
def _boundary(self):
_s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890"
_l = list(_s)
_n = ""
for i in range(15):
_x = random.randint(0,len(_l)-1)
_n += _l[_x]
_n = "----------" + _n
return _n
def mediaUpload(self,access_token,access_token_secret,media):
#status = u"test"
#status = status.encode('utf-8')
#_response = self.statusesUpdate(access_token,access_token_secret,status)
_request_url = self.media_upload_url
self.prms["oauth_token"] = access_token
self.access_token_secret = access_token_secret
self.prms['oauth_signature'] = self._signature(_request_url)
_prms_keys = sorted(self.prms.keys())
_prms = ''
for i in _prms_keys:
_prms = _prms + "," + i + "=\"" + urllib.quote(self.prms[i],"") + "\""
else:
_prms = _prms[1:]
_prms = "OAuth " + _prms
_boundary = self._boundary()
_request = urllib2.Request(_request_url)
_request.add_header("Authorization", _prms)
_request.add_header("Content-Type", "multipart/form-data; boundary=%s" % _boundary)
_data = []
_data.append("--"+_boundary)
_data.append("Content-Disposition: form-data; name=\"media\";")
_data.append("")
_data.append(media)
_data.append("--" + _boundary + "--")
_data.append("")
_result = urllib2.urlopen(_request,"\r\n".join(_data))
_response = _result.read()
json_dict = json.loads(_response)
#logging.info(json_dict)
#logging.info(json_dict["media_id"])
del self.prms['oauth_signature']
status = u" "
status = status.encode('utf-8')
_response = self.statusesUpdate(access_token,access_token_secret,status,json_dict["media_id_string"])
return _response
def statusesUpdate(self,access_token,access_token_secret,status,media_ids=""):
self.prms["oauth_token"] = access_token
self.access_token_secret = access_token_secret
self.prms["status"] = status
_prms = {}
_prms["status"] = status
if media_ids != "":
self.prms["media_ids"] = media_ids
_prms["media_ids"] = media_ids
#logging.info(self.access_token_secret)
_response = self._request(self.statuses_update_url,_prms)
return _response
0 コメント:
コメントを投稿