Wikipediaを検索してくれるTwitterロボットユーザ
ずっと使ってなかったここを使うか。
Pythonの練習をかねて、Twitterのダイレクトメッセージを受け取り、Wikipediaを調べて返信するロボットユーザを作ってみました。下にソースを載せます。
Python2.4+elementtree or Python2.5で動いてます。
実際のロボットはこちら。
http://twitter.com/twipediajp
使うには、上のユーザをaddした後、反対にaddされるのを待ってください。add通知が来たら「d twipediajp 調べたい言葉」のようにDMを送信します。
http://wikipedia.simpleapi.net/を利用しています。過負荷にならないよう、1分に30回までにしてローカルにキャッシュもしています。
思ったこと。
- DMのやりとりをするには互いにaddしてないといけない。addされた時に自動でaddしかえす良い方法はないものか
- ていうかDMじゃなくて公開のやり取りのほうがいいのかなー
- デコレータ関数にクロージャを書いてみたら面倒くさかった(Pythonには完全なレキシカルスコープがないため)
twitterdict.py
# coding: utf-8 import urllib, urllib2, time, worddict twitter_user = "twipediajp" twitter_password = "" recv_url = "http://twitter.com/direct_messages.xml" send_url = "http://twitter.com/direct_messages/new.xml" import bsddb cache = bsddb.hashopen("dictcache.db", "c") def get_direct_messages(since=None): try: from xml.etree import ElementTree except: from elementtree import ElementTree url = recv_url if since: url += "?since=" + urllib.quote_plus( time.strftime("%a, %d %b %Y %H:%M:%S +GMT", since)) page = send_with_basic_auth(url) xml = page.read() if not xml: return [] messages = [] root = ElementTree.fromstring(xml) for elm in root.findall("direct_message"): user = elm.findtext("sender_screen_name") message = elm.findtext("text") messages.append((user, message)) return messages def send_direct_message(user, message): params = urllib.urlencode({"user": user.encode("UTF-8"), "text": message.encode("UTF-8")}) page = send_with_basic_auth(send_url, params) print page.read() def send_with_basic_auth(url, params=None): auth_handler = urllib2.HTTPBasicAuthHandler() auth_handler.add_password("Twitter API", "http://twitter.com", twitter_user, twitter_password) opener = urllib2.build_opener(auth_handler) urllib2.install_opener(opener) if params: return urllib2.urlopen(url, params) else: return urllib2.urlopen(url) def serve(since=None): messages = get_direct_messages(since) next = time.gmtime() for message in messages: key = message[1].encode("UTF-8") if cache.has_key(key): resp = cache[key] else: try: time.sleep(1) resp = worddict.get_dict(message[1], 140) cache[key] = resp.encode("UTF-8") cache.sync() except: resp = "Sorry, I'm busy now ..." send_direct_message(message[0], resp) return next def main(): import pickle try: f = file("since", "r") since = pickle.load(f) f.close() except: since = None while True: since = serve(since) f = file("since", "w") pickle.dump(since, f) f.close() time.sleep(60) if __name__ == "__main__": main()
worddict.py
# coding: utf-8 import myutil @myutil.check_count_in_time(limit=30, t=60) def fetch_xml(keyword): import urllib page = urllib.urlopen("http://wikipedia.simpleapi.net/api?output=xml&keyword=" + urllib.quote_plus(keyword.encode("UTF-8"))) return page.read() def get_article(xml, keyword): try: from xml.etree import ElementTree except: from elementtree import ElementTree root = ElementTree.fromstring(xml) elms = root.findall("result") for elm in elms: if (elm.findtext("title") == keyword): return (elm.findtext("url"), elm.findtext("body")) if 0 < len(elms): return (elms[0].findtext("url"), elms[0].findtext("body")) return None def get_summary(article, keyword, maxlen=150): if not article: return "I don't know " + keyword + "." url = article[0] string = article[1] urllen = len(url) + 2 if maxlen < len(string) + urllen: string = string[:min(len(string),maxlen-urllen-4)] + " ..." idx = string.rfind(u"。") if idx != -1: string = string[:idx+1] else: marks = [u"の意味", u"のこと"] for mark in marks: idx = string.find(mark) if idx != -1: return string[0:idx+len(mark)] + "(" + url + ")" return string + "(" + url + ")" def get_dict(keyword, maxlen=150): xml = fetch_xml(keyword) article = get_article(xml, keyword); return get_summary(article, keyword, maxlen)
myutil.py
# coding: utf-8 class MutableNumber: def __init__(self, num): self.num = num def set(self, num): self.num = num def get(self): return self.num def inc(self): self.num += 1 def dec(self): self.num -= 1 class TimeCountError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) def check_count_in_time(limit, t): import time def director(func): last = MutableNumber(0) count = MutableNumber(0) def check(*args, **kwargs): if last.get() == 0 or last.get() + t < time.time(): last.set(time.time()) count.set(0) if count.get() < limit: count.inc() return func(*args, **kwargs) else: raise TimeCountError("count " + str(limit) + " over " + " in " + str(t) + " seconds.") check.func_name = func.func_name return check return director