#! /usr/bin/env python # Twit: A Python-Twitter client Copyright 2009 by Akkana Peck. # Please share, modify and enjoy under the GPLv2 or higher. # 2do: # - integrate searches with regular timeline (diff color?) import twitter, Tkinter, tkMessageBox import simplejson, urllib, urllib2, datetime # For topic searches import os, sys, time, string, re # Set socket connection timeout shorter, since the window blocks # waiting for Twitter's sometimes very long timeouts: import socket socket.setdefaulttimeout(60) # Globals you can reset in ~/.twit num_entries = 11 sleep_seconds = 300 browser_str = "firefox %s" browser_newtab_str = "firefox -new-tab %s" hide_chars = "\t\n\r" # Chars some people put in tweets Debug = False # # Read username and password from ~/.twit # It should look like this: #username = "username" #passwd = "passwd" # Optionally you can also set num_entries and sleep_seconds, # browser_str and browser_newtab_str # execfile(os.path.join(os.environ["HOME"], ".twit")) class TwitWindow : """ Display Twitter output in a TkInter window.""" def __init__(self) : global username, password global sleep_seconds, browser_str, browser_newtab_str self.username = username self.password = password self.sleep_seconds = sleep_seconds self.browser_str = browser_str self.browser_newtab_str = browser_newtab_str self.tz_offset = 0 # Time zone offset if needed # colors -- these should be configurable in .twit too self.old_text_bg = "#cef" self.new_text_bg = "#eff" self.old_label_bg = "#068" self.new_label_bg = "#07c" self.err_label_bg = "#f66" self.err_text_bg = "#fee" self.me_label_bg = "#f66" self.me_text_bg = "#fee" def Retweet(self, i) : #print self.texts[i].get(1.0, Tkinter.END) s = "RT @" + self.screennames[i] + ": " + \ self.texts[i].get(1.0, Tkinter.END) # Unfortunately some tweets are in oddball charsets # that can't be passed through spawnlp, let alone printed: s = s.encode("utf-8", 'backslashreplace') print "Rewteeting:", s sys.stdout.flush() os.spawnlp(os.P_NOWAIT, "ptwit", "ptwit", s) def BlockUser(self, i) : blockuser = "@" + self.screennames[i] print "Adding", blockuser, "to block list" self.blocks.append(blockuser) self.blocklabel.config(text="Blocked: " + \ " ".join(self.blocks)) def ColorEntry(self, i, lfg, lbg, tfg, tbg) : self.texts[i].config(bg=tbg, fg=tfg) self.labels[i].config(bg=lbg, fg=lfg) self.RTbuttons[i].config(bg=lbg, fg=lfg) self.blockbuttons[i].config(bg=lbg, fg=lfg) def CreateWindow(self, noreplies, do_search, follow_conv) : global num_entries # Widgets self.labels = [] self.texts = [] self.RTbuttons = [] self.blockbuttons = [] self.modebutton = None # Button toggles between Replies and Timeline self.blocklabel = None # Label to show current blocks self.timer = None # The timeout used to call UpdateTimeline if do_search : self.window_title = "Twit " + do_search elif follow_conv : self.window_title = "Twit " + follow_conv else : self.window_title = "Twit" # Other stuff self.num_entries = num_entries self.ids = [] # unique id for each entry self.screennames = [] # current screen name for each entry self.blocks = [] # tags/screen names temporarily blocked # Create the main window self.tkroot = Tkinter.Tk() self.tkroot.title(self.window_title) # Create all the labels and text widgets: for i in range(0, self.num_entries) : self.screennames.append(None) hbox = Tkinter.Frame(self.tkroot) hbox.pack(expand=False, fill=Tkinter.X, pady=0) self.RTbuttons.append(Tkinter.Button(hbox, text="R", padx=2, pady=0, highlightthickness=0, command=lambda i=i: self.Retweet(i))) # http://bytes.com/topic/python/answers/536623-tkinter-button-command-lambda-argument-problem # for that lambda expression. self.RTbuttons[i].pack(expand=False, side=Tkinter.LEFT) self.blockbuttons.append(Tkinter.Button(hbox, text="X", padx=2, pady=0, highlightthickness=0, command=lambda i=i: self.BlockUser(i))) self.blockbuttons[i].pack(expand=False, side=Tkinter.RIGHT) self.labels.append(Tkinter.Label(hbox, textvariable=self.screennames[i])) self.labels[i].config(bg="#07c", fg="white") self.labels[i].pack(expand=False, fill=Tkinter.X) self.texts.append(Tkinter.Text()) self.texts[i].config(width=55, height=3) self.texts[i].config(bg="#eff", fg="black") self.texts[i].config(wrap=Tkinter.WORD) # Prepare for URLs self.texts[i].tag_config('link', foreground="blue") self.texts[i].tag_bind('link', '', self.showLink) self.texts[i].tag_bind('link', '', self.showLink) #self.texts[i].tag_bind('link', '', self.blockLink) self.texts[i].tag_bind("link", "", self.show_hand_cursor) self.texts[i].tag_bind("link", "", self.show_arrow_cursor) #self.texts[i].config(cursor="arrow") self.texts[i].pack(expand=True, fill=Tkinter.X) self.blocklabel = Tkinter.Label(self.tkroot, text="") self.blocklabel.pack(expand=True) # We need one single widget to tack the expose event onto: # the block label works as well as any. # Commented out until we do something useful with Expose. #self.blocklabel.bind("", self.expose_handler) self.modebutton = Tkinter.Button(self.tkroot) self.modebutton.pack(expand=False) # Calculate the timezone offset. # python-twitter 0.5 needs an offset; 0.6 and later don't. # Unfortunately there's no way to get the numeric version directly, # it has to be parsed from the __version__ string. try : version = float(twitter.__version__) except : # __version__ may be something like "0.6-devel" version = 0.6 if version > 0.5 : self.tz_offset = 0 else : # Can you believe the time module doesn't have a way to # calculate the current timezone (daylight or not) automatically? if time.daylight : self.tz_offset = time.altzone else : self.tz_offset = time.timezone self.api = twitter.Api(username=self.username, password=self.password) if do_search != None : self.SearchString(do_search) elif follow_conv : self.FollowConv(follow_conv) elif noreplies : self.UpdateTimeline() else : self.GetReplies() self.tkroot.mainloop() def FollowConv(self, follow_conv) : global Debug def GetReplies(self) : global Debug if self.timer : self.tkroot.after_cancel(self.timer) self.modebutton.config(text="Full Timeline", command=self.UpdateTimeline) try : if Debug : print "Getting replies..." statuses = self.api.GetReplies() if Debug : print "done." self.UpdateWindow(statuses) except Exception, e: self.ShowError("Can't get Twitter replies: " + str(e)) def UpdateTimeline(self) : global Debug if self.timer : self.tkroot.after_cancel(self.timer) self.modebutton.config(text="View Replies", command=self.GetReplies) try : if Debug : print "Updating... ", sys.stdout.flush() statuses = self.api.GetFriendsTimeline(self.username) # replies = self.api.GetReplies() messages = self.api.GetDirectMessages(self) if Debug : print "updated" sys.stdout.flush() # Merge the three timelines into one. # But replies show up in the regular stream; messages don't. si = 0 # ri = 0 mi = 0 while si < len(statuses) : sd = statuses[si].created_at_in_seconds while mi < len(messages) and \ messages[mi].created_at_in_seconds >= sd : tmpuser = twitter.User(messages[mi].sender_id, "Direct", messages[mi].sender_screen_name) messages[mi].user = tmpuser statuses.insert(si, messages[mi]) si += 1 mi += 1 # while ri < len(replies) and \ # replies[ri].created_at_in_seconds >= sd : # statuses.insert(si, replies[ri]) # ri += 1 # mi += 1 si += 1 self.UpdateWindow(statuses) # Update again after the specified timeout: self.timer = self.tkroot.after(self.sleep_seconds * 1000, self.UpdateTimeline) except Exception, e: # Use a shorter timeout -- Twitter errors are usually # just the site being flaky, and a few retries will do it. self.timer = self.tkroot.after(self.sleep_seconds * 300, self.UpdateTimeline) self.ShowError("Can't get Twitter timeline --" + str(e) + "-- will retry") def ShowError(self, msg) : print msg sys.stdout.flush() e = num_entries - 1 self.ColorEntry(e, "white", self.err_label_bg, "black", self.err_text_bg) self.labels[e].config(text="Error") self.texts[e].config(bg=self.err_text_bg, fg="black") self.texts[e].config(state=Tkinter.NORMAL) self.texts[e].delete(1.0, Tkinter.END) self.texts[e].insert(Tkinter.END, msg) self.texts[e].config(state=Tkinter.DISABLED) # And until I'm confident of that code: #tkMessageBox.showerror('Error!', msg) # Current Python-Twitter API doesn't support subject searches # (that may be coming in a future release) # but it's possible to do it with JSON (some code lifted from gwibber): def SearchString(self, searchstr) : global Debug class SearchResult (twitter.Status): def __init__(self, api, data) : d = datetime.datetime.strptime(data["created_at"], "%a, %d %b %Y %H:%M:%S +0000") #secs = time.mktime(d.timetuple()) if Debug : print "=============" print "Msg:", data["text"], print "User", data["from_user"], "id", data["from_user_id"] print "data", data #user = api.GetUser(data["from_user"]) user = twitter.User(0, data["from_user"], " ") if Debug : print "User", user sys.stdout.flush() timestr = d.strftime("%a %b %d %H:%M:%S +0000 %Y") # Create a twitter.Status object. # Since this json object doesn't have an ID, # use the time string as a poor-man's unique ID. # That way new messages will show up in the "new" color. super(SearchResult, self).__init__(created_at=timestr, id=timestr, text=data["text"], user=user) # Even though we passed in the user, somehow it doesn't # get initialized. So try again: self.user = user if Debug : print "\nCreated search result: status.user is" print self.user print "\nstatus.text is", self.text def get_search_data(query): global Debug if Debug : print "Getting search data ..." sys.stdout.flush() s = simplejson.loads(urllib2.urlopen( urllib2.Request("http://search.twitter.com/search.json", urllib.urlencode({"q": query}))).read()) if Debug : print "Got it!" sys.stdout.flush() return s def json_search(query): for data in get_search_data(query)["results"]: yield SearchResult(self.api, data) def json_search_url(query): urls = support.unshorten_url(query) for data in get_search_data(" OR ".join(urls))["results"]: if any(item in data["text"] for item in urls): yield SearchResult(self.api, data) # # Finally, the code to do the search and update the window: # if self.timer : self.tkroot.after_cancel(self.timer) self.modebutton.config(text="Full Timeline", command=self.UpdateTimeline) try : # Would like to wrap this in a try clause, since it can # fail if the network is down. But for some reason the # exception doesn't fire until later inside UpdateWindow. statuses = json_search(searchstr) self.UpdateWindow(statuses) # Run this again five minutes from now: self.timer = self.tkroot.after(self.sleep_seconds * 1000, self.SearchString, searchstr) except : print "Couldn't update search window" # Try again with a shorter timeout -- probably a net problem. self.timer = self.tkroot.after(self.sleep_seconds * 200, self.SearchString, searchstr) def UpdateWindow(self, statuses) : # # http://effbot.org/zone/re-sub.htm#unescape-html # def unescape(text): """ Removes HTML or XML character references and entities from a string. @param text The HTML (or XML) source text. @return The plain text, as a Unicode string, if necessary. """ import htmlentitydefs def fixup(m): text = m.group(0) if text[:2] == "&#": # character reference try: if text[:3] == "&#x": return unichr(int(text[3:-1], 16)) else: return unichr(int(text[2:-1])) except ValueError: pass else: # named entity try: text = unichr(htmlentitydefs.name2codepoint[text[1:-1]]) except KeyError: pass return text # leave as is return re.sub("&#?\w+;", fixup, text) def interval2str(secs) : secs = int(secs) if secs < 60 : return str(secs) + " seconds ago" mins = int(secs / 60) secs = secs % 60 if mins < 60 : return str(mins) + " min " + str(secs) + " sec ago" hours = int(mins / 60) mins = mins % 60 if (secs > 30) : mins += 1 if hours < 24 : return str(hours) + " hr " + str(mins) + " min ago" days = int(hours / 24) hours = hours % 24 if days == 1 : return "1 day " + str(hours) + " hr ago" else : return str(days) + " days " + str(hours) + " hr ago" # Find the next URL in the given string. # Return start and end points (character positions), or -1, -1. # http://mail.python.org/pipermail/python-list/2007-January/595436.html # Somewhat modified, e.g. remove \\(\\) urlfinders = [ # For some reason this first, simplified pattern works when # the final repeater is + (it matches all chars then), but # it doesn't work if it's * (matches none at all, stops with # the slash after the hostname). # And it still doesn't recognize a URL like http://usat.me?84865 # -- in fact, it won't match anything in the string # Neato! Quarters to recognize national parks http://usat.me?84865 blah blah # with or without the blah blah #re.compile("((news|telnet|nttp|file|http|ftp|https)://)([-A-Za-z0-9\\.]+)(:[0-9]*)?(/[-A-Za-z0-9_\\$\\.\\+\\!\\*,;:@&=\\?~\\#\\%]+)"), # Would also be nice to include something about not ending # with a period, to fix end-of-sentence issues. re.compile("([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}|(((news|telnet|nttp|file|http|ftp|https)://)|(www|ftp)[-A-Za-z0-9]*\\.)[-A-Za-z0-9\\.]+)(:[0-9]*)?/[-A-Za-z0-9_\\$\\.\\+\\!\\*,;:@&=\\?/~\\#\\%]*[^]'\\.}>\\),\\\"]"), re.compile("([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}|(((news|telnet|nttp|file|http|ftp|https)://)|(www|ftp)[-A-Za-z0-9]*\\.)[-A-Za-z0-9\\.]+)(:[0-9]*)?"), re.compile("[#@][-A-Za-z0-9_]+") ] def findURL(txt) : match = None for finder in urlfinders : newmatch = finder.search(txt) if not newmatch : continue # If this is the first match, or if this match is earlier # than the current one, save it if not match or newmatch.start() < match.start() : match = newmatch if not match : #print "No url in'" + txt.encode("utf-8") + "'" return -1, -1 #print "Found url '" + txt[match.start():match.end()] + "'", #print "in '" + txt.encode("utf-8") + "'" return match.start(), match.end() ################################################# # Finally, we begin the real UpdateWindow code # now = time.time() self.tkroot.title(self.window_title + " (" + time.ctime(now) + ")") now += self.tz_offset # How recent does a post need to be to be shown as new? # Eventually this may be set based on factors like # when we last got an expose event. last_time = now - self.sleep_seconds # Save the list of message IDs we saw last time. # Twitter sometimes posts things out of order, so a new # entry may show up anywhere. last_ids = self.ids self.ids = [] # Two loop variables: st is the status entry we got from Twitter, # while slot is the slot in the window. They're the same # if nothing is blocked, otherwise st > slot. slot = -1 for st in statuses : slot += 1 #print "Checking entry #", slot, ":", st #sys.stdout.flush() if slot >= self.num_entries : break self.texts[slot].config(state=Tkinter.NORMAL) self.texts[slot].delete(1.0, Tkinter.END) msg = st.text # If it's blocked, skip it if self.is_blocked(st.user.screen_name, msg) : print "BLOCKING entry from", st.user.name, ":", \ msg.encode("utf-8") slot -= 1 continue # ignore user.profile_image_url for now self.ids.append(st.id) self.screennames[slot] = st.user.screen_name # Set the background color based on whether it mentions us, # whether it's a new post and whether we saw it last time: if msg.find(self.username) >= 0 : self.ColorEntry(slot, "white", self.me_label_bg, "black", self.me_text_bg) elif st.user.name == "Direct" : self.ColorEntry(slot, "white", self.me_label_bg, "black", self.me_text_bg) elif not (st.id in last_ids) : # or st.created_at_in_seconds > last_time self.ColorEntry(slot, "white", self.new_label_bg, "black", self.new_text_bg) else : self.ColorEntry(slot, "white", self.old_label_bg, "black", self.old_text_bg) # Display the time elapsed since the post time: interval = now - st.created_at_in_seconds self.labels[slot].config(text=st.user.name + " (" + \ st.user.screen_name + "), " + \ interval2str(interval)) # Decode HTML entities msg = unescape(msg) # Get rid of characters people put in tweets that make # them hard to read: for ch in hide_chars : msg = msg.replace(ch, ' ') # Loop over text, linkifying URLs: while True : lstart, lend = findURL(msg) if lstart < 0 : # No more links self.texts[slot].insert(Tkinter.END, msg) break # Found a link. Linkify it self.texts[slot].insert(Tkinter.END, msg[0:lstart]) link = msg[lstart:lend] self.texts[slot].insert(Tkinter.END, link, ('link', link)) msg = msg[lend:] self.texts[slot].config(state=Tkinter.DISABLED) # If there were fewer statuses than there are slots, # zero out the remaining slots: for i in range(slot, self.num_entries) : self.labels[i].config(bg=self.old_text_bg, fg="white") self.labels[i].config(text="") self.texts[i].config(bg=self.old_text_bg, fg="black") self.texts[i].config(state=Tkinter.NORMAL) self.texts[i].delete(1.0, Tkinter.END) self.texts[slot].config(state=Tkinter.DISABLED) def is_blocked(self, user, msg) : for blocks in self.blocks : if blocks[0] == '@' : print "User", blocks[1:], "is blocked" if blocks[1:] == user : return True else : print user, "is not", blocks[1:] # If it doesn't start with @, just look for a string match: elif string.find(msg, blocks) >= 0 : return True # Didn't find any blocks return False # # Called when the user clicks on a link: # # This isn't documented anywhere, but you can get the modifier key # states on a ButtonPress event by masking these bits: # state = 0: no modifiers # state = 1: shift # state = 4: ctrl # state = 8: alt # I don't know what state=2 would mean. # def showLink(self, event) : # print "Event:", event.__dict__ url = event.widget.tag_names(Tkinter.CURRENT)[1] # # urls starting with # (tags) or @ (users) have two treatments: # Plain click loads their info in the browser, # while shift-click toggles blocking them # if url[0] == "#" or url[0] == "@" : if event.state & 1 > 0 : # Shift-click: block this link if url in self.blocks : #print url, "is already blocked; removing" self.blocks.remove(url) else : #print "Adding", url, "to block list" self.blocks.append(url) self.blocklabel.config(text="Blocked: " + \ " ".join(self.blocks)) return # clicking (no shift) on a tag means load a tag search in Firefox: # there's no Twitter API for that (at least not in python-twitter). # Reset the url and fall through. if url[0] == "#" : # tag url = "http://twitter.com/#search?q=" + url[1:] else : # user url = "http://twitter.com/" + url[1:] # and fall through to normal url handling cmd = None if event.num == 1 : # left click cmd = self.browser_str % url os.system("firefox " + url); elif event.num == 2 : # middle click cmd = self.browser_newtab_str % url if cmd : os.system(cmd); # Eventually, the expose handler will check the time and display # anything newer than the previous expose in the new colors. # For now, commented out. #def expose_handler(self, event) : # print "Expose" @staticmethod def show_hand_cursor(event) : event.widget.configure(cursor="hand1") @staticmethod def show_arrow_cursor(event): event.widget.configure(cursor="arrow") # Separate out all tags in the text message: @staticmethod def find_tags(txt) : tags = [] tagpat = re.compile("#[-A-Za-z0-9\\.]+") while True : match = tagpat.search(txt) if not match : return tags tags.append(txt[match.start():match.end()]) txt = txt[match.end():] print "find_tags: shouldn't ever get here!" return tags # main if __name__ == "__main__" : from optparse import OptionParser usage = "Usage: %prog" versionstr = "%prog 0.6: Twitter client.\n\ Copyright 2009 by Akkana Peck; share and enjoy under the GPL v.2 or later." parser = OptionParser(usage=usage, version=versionstr) parser.add_option("-s", "--search", metavar="#PATTERN", action="store", dest="do_search", help="Run a search for the given pattern (need not start with #)") parser.add_option("-n", "--noreplies", action="store_true", dest="noreplies", default=False, help="Start with timeline, not personal replies") parser.add_option("-c", "--conversation", metavar="user,user", action="store", dest="follow_conv", help="Show only tweets from a few users (during a conversation).\nDoesn't work yet.") (options, args) = parser.parse_args() #for arg in args : # no args, so skip this. win = TwitWindow() win.CreateWindow(options.noreplies, options.do_search, options.follow_conv)