ちょこちょこ修正を加えているAmazon Product Advertising APIを検索するPythonコードですが、以下のような形でbrowsenodelookupを検索するコードを追加しました。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from urlparse import urlparse
from google.appengine.api import urlfetch
from lxml import etree
import os
import urllib
import datetime
import hashlib
import base64
import logging
import hmac
import xmltodict
class AmazonProductAdvertisingAPI(object):
def __init__(self,access_key_id,secret_access_key,associate_tag = None,locale = "US"):
self._access_key_id = access_key_id
self._secret_access_key = secret_access_key
self._associate_tag = associate_tag
self._validate_certificate = True
_environ = os.environ
_http_host = _environ["HTTP_HOST"]
if _http_host.find("localhost") > -1:
self._validate_certificate = False
_wsgi_url_scheme = _environ["wsgi.url_scheme"]
self._endpoint = {
"JP":_wsgi_url_scheme + "://webservices.amazon.co.jp/onca/xml",
"US":_wsgi_url_scheme + "://webservices.amazon.com/onca/xml",
"FR":_wsgi_url_scheme + "://webservices.amazon.fr/onca/xml",
"IN":_wsgi_url_scheme + "://webservices.amazon.in/onca/xml",
"IT":_wsgi_url_scheme + "://webservices.amazon.it/onca/xml",
"UK":_wsgi_url_scheme + "://webservices.amazon.co.uk/onca/xml"
}
if self._endpoint.has_key(locale):
self._product_advertising_url = self._endpoint[locale]
else:
self._product_advertising_url = self._endpoint["US"]
self._service = "AWSECommerceService"
#self._method = ""
#self._prms = {}
#タイムスタンプを設定
def _timestamp(self,prms):
prms["Timestamp"] = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
#URLエンコード
def _urlencode_rfc3986(self,s):
_s = urllib.quote(s)
_s = _s.replace("%7E", "~")
return _s
#アクセストークン用の文字列作成
def _canonical_string(self,prms):
_keys = sorted(prms)
_canonical_string = ""
for _k in _keys:
_canonical_string = _canonical_string + "&" + self._urlencode_rfc3986(_k) + "=" + self._urlencode_rfc3986(prms[_k])
else:
_canonical_string = _canonical_string[1:]
return _canonical_string
#問い合わせURLの作成
def _request_url(self,prms,method):
_url_info = urlparse(self._product_advertising_url)
_canonical_string = self._canonical_string(prms)
_signature = method + "\n" + _url_info.hostname + "\n" + _url_info.path +"\n" + _canonical_string
_signature = hmac.new(self._secret_access_key, _signature, hashlib.sha256).digest()
_signature = base64.b64encode(_signature)
_url = self._product_advertising_url + "?" + _canonical_string + "&Signature=" + self._urlencode_rfc3986(_signature)
return _url
#itemsearch
def itemsearch(self,searchindex,browsenode=None,availability="Available",responsegroup="Small",itempage=1,keywords=None,minimumprice=None,maximumprice=None,isremoveadult=True,sort=None):
_prms = {}
_prms["Service"] = self._service
_prms["AWSAccessKeyId"] = self._access_key_id
if self._associate_tag is not None:
_prms["AssociateTag"] = self._associate_tag
_prms["Operation"] = "ItemSearch"
self._timestamp(_prms)
if availability is None:
return None
for i in ["Available"]:
if i == availability:
_prms["Availability"] = availability
break
else:
return None
if searchindex is None:
return None
if searchindex == u"":
return None
_prms["SearchIndex"] = searchindex
if responsegroup is None:
return None
if responsegroup == "":
return None
_prms["ResponseGroup"] = responsegroup
if browsenode is not None:
_prms["BrowseNode"] = browsenode
if itempage is not None:
_prms["ItemPage"] = str(itempage)
if keywords is not None:
#logging.info()
if isinstance(keywords,unicode):
#logging.info(keywords)
_prms["Keywords"] = keywords.encode('utf-8')
else:
_prms["Keywords"] = keywords
#_prms["Keywords"] = keywords
if maximumprice is not None:
_prms["MaximumPrice"] = str(maximumprice)
if minimumprice is not None:
_prms["MinimumPrice"] = str(minimumprice)
if sort is not None:
_prms["Sort"] = sort
_url = self._request_url(_prms,"GET")
#logging.info(_url)
_result = self._request(_url)
return _result
#itemlookup
def itemlookup(self,asin,responsegroup="Small"):
_prms = {}
_prms["Service"] = self._service
_prms["AWSAccessKeyId"] = self._access_key_id
if self._associate_tag is not None:
_prms["AssociateTag"] = self._associate_tag
_prms["Operation"] = "ItemLookup"
self._timestamp(_prms)
if asin is None:
return None
if asin == "":
return None
_prms["ItemId"] = asin
if responsegroup is None:
return None
if responsegroup == "":
return None
_prms["ResponseGroup"] = responsegroup
_url = self._request_url(_prms,"GET")
_result = self._request(_url)
return _result
def _request(self,url):
try:
_result = urlfetch.fetch(url,validate_certificate = self._validate_certificate)
_status_code = _result.status_code
#logging.info(_status_code)
if _status_code == 200:
_content = _result.content
_root = etree.fromstring(_content)
_ns = _root.xpath('namespace-uri(.)')
_namespace = {"ns":_ns}
_errorTags = _root.findall('.//ns:Error',namespaces=_namespace)
#logging.info(_content)
if len(_errorTags) > 0:
_errorTag = _errorTags[0]
_code = _errorTag.findall('.//ns:Code',namespaces=_namespace)
_message = _errorTag.findall('.//ns:Message',namespaces=_namespace)
logging.error(u"code: " + _code[0].text + " message: " + _message[0].text)
else:
return _root
else:
logging.error(u"status code: " + str(_status_code) + " content:" + _content)
except urlfetch.DeadlineExceededError:
logging.error(u"urlfetch DeadlineExceededError")
except urlfetch.ResponseTooLargeError, response:
logging.error(u"urlfetch ResponseTooLargeError" + response)
except urlfetch.InternalTransientError:
logging.error(u"urlfetch InternalTransientError")
except urlfetch.InvalidURLError:
logging.error(u"urlfetch InvalidURLError")
except urlfetch.Error:
logging.error(u"urlfetch Error")
return None
def browsenodelookup(self,browsenodeid):
_prms = {}
_prms["Service"] = self._service
_prms["AWSAccessKeyId"] = self._access_key_id
if browsenodeid is None or browsenodeid == "":
return None
if isinstance(browsenodeid,unicode):
_prms["BrowseNodeId"] = browsenodeid.encode('utf-8')
else:
_prms["BrowseNodeId"] = browsenodeid
if self._associate_tag is not None:
_prms["AssociateTag"] = self._associate_tag
_prms["Operation"] = "BrowseNodeLookup"
self._timestamp(_prms)
_url = self._request_url(_prms,"GET")
#logging.info(_url)
_result = self._request(_url)
return _result
0 コメント:
コメントを投稿