Wikipediaを検索してくれるTwitterロボットユーザ

ずっと使ってなかったここを使うか。


Pythonの練習をかねて、Twitterのダイレクトメッセージを受け取り、Wikipediaを調べて返信するロボットユーザを作ってみました。下にソースを載せます。
Python2.4+elementtree or Python2.5で動いてます。


実際のロボットはこちら。
http://twitter.com/twipediajp
使うには、上のユーザをaddした後、反対にaddされるのを待ってください。add通知が来たら「d twipediajp 調べたい言葉」のようにDMを送信します。


http://wikipedia.simpleapi.net/を利用しています。過負荷にならないよう、1分に30回までにしてローカルにキャッシュもしています。


思ったこと。

  • DMのやりとりをするには互いにaddしてないといけない。addされた時に自動でaddしかえす良い方法はないものか
  • ていうかDMじゃなくて公開のやり取りのほうがいいのかなー
  • デコレータ関数にクロージャを書いてみたら面倒くさかった(Pythonには完全なレキシカルスコープがないため)


twitterdict.py

# coding: utf-8
import urllib, urllib2, time, worddict

twitter_user = "twipediajp"
twitter_password = ""

recv_url = "http://twitter.com/direct_messages.xml"
send_url = "http://twitter.com/direct_messages/new.xml"

import bsddb
cache = bsddb.hashopen("dictcache.db", "c")

def get_direct_messages(since=None):
  try:
    from xml.etree import ElementTree
  except:
    from elementtree import ElementTree
  url = recv_url
  if since:
    url += "?since=" + urllib.quote_plus(
           time.strftime("%a, %d %b %Y %H:%M:%S +GMT", since))
  page = send_with_basic_auth(url)
  xml = page.read()
  if not xml: return []
  messages = []
  root = ElementTree.fromstring(xml)
  for elm in root.findall("direct_message"):
    user = elm.findtext("sender_screen_name")
    message = elm.findtext("text")
    messages.append((user, message))
  return messages

def send_direct_message(user, message):
  params = urllib.urlencode({"user": user.encode("UTF-8"),
                             "text": message.encode("UTF-8")})
  page = send_with_basic_auth(send_url, params)
  print page.read()

def send_with_basic_auth(url, params=None):
  auth_handler = urllib2.HTTPBasicAuthHandler()
  auth_handler.add_password("Twitter API", "http://twitter.com", twitter_user, twitter_password)
  opener = urllib2.build_opener(auth_handler)
  urllib2.install_opener(opener)
  if params:
    return urllib2.urlopen(url, params)
  else:
    return urllib2.urlopen(url)

def serve(since=None):
  messages = get_direct_messages(since)
  next = time.gmtime()
  for message in messages:
    key = message[1].encode("UTF-8")
    if cache.has_key(key):
      resp = cache[key]
    else:
      try:
        time.sleep(1)
        resp = worddict.get_dict(message[1], 140)
        cache[key] = resp.encode("UTF-8")
        cache.sync()
      except:
        resp = "Sorry, I'm busy now ..."
    send_direct_message(message[0], resp)
  return next

def main():
  import pickle
  try:
    f = file("since", "r")
    since = pickle.load(f)
    f.close()
  except:
    since = None
  while True:
    since = serve(since)
    f = file("since", "w")
    pickle.dump(since, f)
    f.close()
    time.sleep(60)

if __name__ == "__main__":
  main()

worddict.py

# coding: utf-8
import myutil

@myutil.check_count_in_time(limit=30, t=60)
def fetch_xml(keyword):
  import urllib
  page = urllib.urlopen("http://wikipedia.simpleapi.net/api?output=xml&keyword=" +
                        urllib.quote_plus(keyword.encode("UTF-8")))
  return page.read()

def get_article(xml, keyword):
  try:
    from xml.etree import ElementTree
  except:
    from elementtree import ElementTree
  root = ElementTree.fromstring(xml)
  elms = root.findall("result")
  for elm in elms:
    if (elm.findtext("title") == keyword):
      return (elm.findtext("url"), elm.findtext("body"))
  if 0 < len(elms): return (elms[0].findtext("url"), elms[0].findtext("body"))
  return None

def get_summary(article, keyword, maxlen=150):
  if not article: return "I don't know " + keyword + "."
  url = article[0]
  string = article[1]
  urllen = len(url) + 2
  if maxlen < len(string) + urllen:
    string = string[:min(len(string),maxlen-urllen-4)] + " ..."
  idx = string.rfind(u"。")
  if idx != -1:
    string = string[:idx+1]
  else:
    marks = [u"の意味", u"のこと"]
    for mark in marks:
      idx = string.find(mark)
      if idx != -1: return string[0:idx+len(mark)] + "(" + url + ")"
  return string + "(" + url + ")"

def get_dict(keyword, maxlen=150):
  xml = fetch_xml(keyword)
  article = get_article(xml, keyword);
  return get_summary(article, keyword, maxlen)

myutil.py

# coding: utf-8

class MutableNumber:
  def __init__(self, num):
    self.num = num
  def set(self, num):
    self.num = num
  def get(self):
    return self.num
  def inc(self):
    self.num += 1
  def dec(self):
    self.num -= 1

class TimeCountError(Exception):
  def __init__(self, value):
    self.value = value
  def __str__(self):
    return repr(self.value)

def check_count_in_time(limit, t):
  import time
  def director(func):
    last = MutableNumber(0)
    count = MutableNumber(0)
    def check(*args, **kwargs):
      if last.get() == 0 or last.get() + t < time.time():
        last.set(time.time())
        count.set(0)
      if count.get() < limit:
        count.inc()
        return func(*args, **kwargs)
      else:
        raise TimeCountError("count " + str(limit) + " over " + 
                             " in " + str(t) + " seconds.")
    check.func_name = func.func_name
    return check
  return director