2017/05/12

[Google App Engine][Python]Twitterの共通classを修正しました

以前、Google App Engine上で使える便利クラスを作りましたがツイートする際にstatusesUpdateメソッドにaccess_token_secretとaccess_tokenを設定する必要があったので下のようにinitのタイミングだけ設定すればおkな形に修正をしました。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from google.appengine.api import urlfetch
import datetime
import time
import random
import urllib
import urllib2
import hmac
import hashlib
import json
import logging

class Twitter(object):
    def __init__(self,consumer_secret,consumer_key,access_token_secret=None,access_token=None):

        self.consumer_secret = consumer_secret

        self.access_token_secret = ""
        if access_token_secret is not None:
          self.access_token_secret = access_token_secret

        self.request_token_url = "https://api.twitter.com/oauth/request_token"
        self.oauth_authenticate_url = "https://api.twitter.com/oauth/authenticate?oauth_token="
        self.access_token_url = "https://api.twitter.com/oauth/access_token"
        self.statuses_update_url = "https://api.twitter.com/1.1/statuses/update.json"
        self.media_upload_url = "https://upload.twitter.com/1.1/media/upload.json"
        self.help_configuration_url = "https://api.twitter.com/1.1/help/configuration.json"

        self.prms = {
            "oauth_consumer_key":consumer_key,
            "oauth_nonce":Twitter._nonce(),
            "oauth_signature_method":'HMAC-SHA1',
            "oauth_version":'1.0',
            "oauth_timestamp":Twitter._timeStamp()
        }

        if access_token is not None:
          self.prms["oauth_token"] = access_token

    @classmethod
    def _timeStamp(cls):
        _d = datetime.datetime.today()
        _d = time.mktime(_d.timetuple())
        _d = str(int(_d))
        return _d

    @classmethod
    def _nonce(cls):
        _s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789[]{}!$%&'()-^\:;*+><"
        _l = list(_s)
        _n = ""
        for i in range(20):
            _x = random.randint(0,len(_l)-1)
            _n += _l[_x]
        return _n

    def _signature(self,request_url,method):
        _prms_keys = sorted(self.prms.keys())
        _prms = ''
        for i in _prms_keys:
            _prms = _prms + "&" + i + "=" + urllib.quote(self.prms[i],"")
        else:
            _prms = _prms[1:]

        _prms = urllib.quote_plus(method) + "&" + urllib.quote_plus(request_url) + "&" + urllib.quote_plus(_prms)
        _h = hmac.new("%s&%s" % (urllib.quote_plus(self.consumer_secret), urllib.quote_plus(self.access_token_secret)), _prms, hashlib.sha1)
        _sig = _h.digest().encode("base64").strip()
        return _sig

    def getRedirectOauthAuthenticateUrl(self):
        if self.prms.has_key("oauth_token"):
            _url = self.oauth_authenticate_url + self.prms["oauth_token"]
            return _url
        else:
            return None

    def requestAccessToken(self,oauth_token,oauth_verifier):
        self.prms["oauth_token"] = oauth_token
        _response = self._request(self.access_token_url,{"oauth_verifier":oauth_verifier},"POST")
        if _response:
            _response = _response.split("&")
            _result = {}
            for i in _response:
                key_value = i.split("=")
                _result[key_value[0]] = key_value[1]
            return _result
        else:
            return _response

    def requestOAuthToken(self,oauth_callback_url):
        self.prms["oauth_callback"] = oauth_callback_url
        _response = self._request(self.request_token_url,{"oauth_callback":self.prms["oauth_callback"]},"POST")
        if _response is None:
            return False

        _response = _response.split("&")
        _result = {}
        for i in _response:
            key_value = i.split("=")
            _result[key_value[0]] = key_value[1]


        if _result["oauth_callback_confirmed"] == "true":
            self.prms["oauth_token"] = _result["oauth_token"]
            return True
        else:
            return False

    def _request(self,request_url,request_prms,method):
        self.prms['oauth_signature'] = self._signature(request_url,method)

        _headers = {
          'Content-Type': "application/x-www-form-urlencoded"
        }

        _urlfetch_method = None
        if method == 'POST':
            _urlfetch_method = urlfetch.POST
        else:
            _urlfetch_method = urlfetch.GET

        _prms_keys = sorted(self.prms.keys())
        _prms = ''
        for i in _prms_keys:
            _prms = _prms + "," + i + "=\"" + urllib.quote(self.prms[i],"") + "\""
        else:
            _prms = _prms[1:]

        _prms = "OAuth " + _prms

        _payload = ""
        for i in request_prms:
            _payload = _payload + "&" + i + "=" + urllib.quote(request_prms[i],"")
        else:
            _payload = _payload[1:]

        _headers["Authorization"] = _prms
        _result = urlfetch.fetch(
            url = request_url,
            payload = _payload,
            method = _urlfetch_method,
            headers = _headers
        )

        if _result.status_code == 200:
            return _result.content
        else:
            logging.info(_result.status_code)
            logging.info(_result.content)
            return None

    def helpConfiguration(self):
        _prms = {}
        _response = self._request(self.help_configuration_url,_prms,"GET")
        if _response is not None:
            _response = json.loads(_response)
        return _response

    def statusesUpdate(self,status,media_ids=""):
        self.prms["status"] = status

        _prms = {}
        _prms["status"] = status
        if media_ids != "":
            self.prms["media_ids"] = media_ids
            _prms["media_ids"] = media_ids

        _response = self._request(self.statuses_update_url,_prms,"POST")
        return _response

0 コメント:

コメントを投稿