|
|
Tuesday, June 15, 2004
|
|
| |
The Observer Pattern in Python
I find that the best way to get a deep understanding of a programming concept is to code it up yourself. Here's my take on the standard Observer Pattern. I wanted to fully understand it, because I expect to need it shortly. This also gave me the opportunity to learn about the handy-dandy weakref module.
This implementation is a bit different from the standard GoF one. For one thing, I wanted a clean way for an Observer to be able to observe multiple Observables, while being able to easily know which Observable was sending it an update notification. That's why, when registering an Observer with an Observable, the Observer tells the Observable the name of the Observer's function to call for the update notification.
Another difference has to do with the methods that the Observer base class provides. Now, strictly speaking from a Python perspective, the Observer base class is unnecessary. Any class that provides a method with the right signature can be used as an Observer. But when I was talking out this design in my head, I kept using phrases like "The observer tells the observable that it is interested in it." This really seemed like an action performed by the Observer, so I wanted that behaviour in an Observer base class.
The last big difference has to do with the way an Observer subscribes and unsubscribes to update notification by an Observable. The standard GoF implementation has the Observable class having 'attach' and 'detach' methods. I have an 'attach' method (named addObserver), and a 'detach' method (named removeObserver). But I'm also using weakref.WeakKeyDictionary as the way that an Observable keeps a list of its Observers. This means that the fact that there's a reference to the Observer in the Observable does not keep the Observable alive if it goes out of scope or is deleted. The weakref to the Observer will be automatically deleted from the Observable's list (I know, a dictionary is not a list. I'm using the word 'list' here loosely.) So, the 'removeObserver' method is not strictly needed; just delete an Observer, and it gets removed from the list kept by any Observable it subscribed to.
Here's the code. It should go in a file named observer.py: import weakref import types ## # The Observer Pattern in Python # # Design goals: # 1. An Observer should be able to observe multiple Observables. # 2. An Observer should be able to tell an Observable what kinds of # events it is interested in observing. # 3. When an Observer is deleted, all Observables that it is observing # should be notified to remove that Observer. # 4. When an Observer is notified of an event, # it should be able to tell which Observable is calling it, # and which event occured. ## ## # The abstract Observable Class. # class Observable(object): def __init__(self): # A WeakKeyDictionary is one where, if the object used as the key # gets deleted, automatically removes that key from the # dictionary. Thus, any Observers which get deleted will be # automatically removed from the observers dictionary, thus having # two effects: # We won't have references to zombie objects in the dictionary, and # We won't have zombie objects, because the reference in this # dictionary won't stay around, and so won't keep the deleted object # alive. self._observers = weakref.WeakKeyDictionary() ## # Add an observer to this Observable's notification list. # @param observer The Observer to add. # @param cbname The name (as a string) of the Observer's # method to call for an event notification, or None for the default # "update" method. # @param events The events the Observer is interested in being # notified about. None means all events. def addObserver(self, observer, cbname=None, events=None): if cbname is None: cbname = "update" if events is not None and type(events) not in (types.TupleType, types.ListType): events = (events,) self._observers[observer] = (cbname, events) ## # Remove an observer from this Observable's list of observers. # Note that this function is not strictly required, because when a # registered Observer is deleted, the weakref mechanism will cause # it to be removed from the notification list. # @param observer the Observer to remove. def removeObserver(self, observer): if observer in self._observers: del self._observers[observer] ## # Notify all currently-registered Observers. # Each observer must have an 'update' method, which should take # three parameters (in addition to self): the Observable, an event, # and a message. # This method will be # called if the event is one that the Observer is interested in, # or if event is 'None', or if the Observer is interested in all # events (it was registered with an event list of 'None'). # @param event The event to notify the Observers about. None # means no specific event. # @param msg A reference to any data object that should be passed # to the Observers, or None. def notifyObservers(self, event=None, msg=None): for observer, data in self._observers.items(): #print "data is", data cbname, events = data #print "cbname is", cbname #print "events is", events if events is None or event is None or event in events: cb = getattr(observer, cbname, None) if cb is None: raise NotImplementedError, "Observer has no %s method." % cbname cb(self, event, msg) ## # The abstract Observer Class # This class is a mix-in to add Observable registration methods to # a concrete Observer class. It is not strictly required. #
class Observer(object): ## # @param observable The Observable to observe. # @param The events this Observer is interested in being # notified about. This should be a tuple or list of events. def __init__(self, observable=None, cbname=None, events=None): if (observable is not None): observable.addObserver(self, cbname, events) ## # Inform an Observable that you would like to be notified when # an interesting event occurs in the Observable. # @param observable The Observable this Observer would like # to observe. # @param cbname The name (as a string) of the Observer's # method to call for an event notification, or None for the default # "update" method. # @param events A tuple or list of events this Observer would like # to be notified of by the Observer, or None if it would like to # be notified of all events. def subscribeToObservable(self, observable, cbname=None, events=None): assert observable is not None, "Observable is None" observable.addObserver(self, cbname, events) ## # Inform an observable that this Observer is no longer interested in it. # Note that this function is not strictly required, because when a # registered Observer is deleted, the weakref mechanism will cause # it to be removed from the Observable's notification list. # Use this function when you want to unsubscribe an Observer # without deleting it. # @param observable The Observable that this Observer no longer wants # to observe. def unsubscribeToObservable(self, observable): assert observable is not None, "Observable is None" observable.removeObserver(self)
Here's the code for the unit tests, which also serves as usage examples. It should go in a file named test_observer.py: #! /usr/bin/env python import unittest from observer import Observable, Observer EVENT_FOO, EVENT_UPDATE = range(2) class Stuff(Observable): def __init__(self): Observable.__init__(self) self._data = None def setData(self, data): self._data = data self.notifyObservers(None, data) def setDataWithUpdateEvent(self, data): self._data = data self.notifyObservers(EVENT_UPDATE, data) def setDataWithFooEvent(self, data): self._data = data self.notifyObservers(EVENT_FOO, data) class StuffWatcher(Observer):
def __init__(self, observable=None, cbname=None, events=None): Observer.__init__(self, observable, cbname, events) self._data = None self._updateData = None self._reportData = None def update(self, observable, event, msg): self._data = msg self._updateData = msg def report(self, observable, event, msg): self._data = msg self._reportData = msg class TestCase_01_Observer(unittest.TestCase):
def test_01_noMethod(self): observable = Stuff() observer1 = StuffWatcher(observable, "foo") self.assertRaises(NotImplementedError, observable.setData, 10) def test_02_simple(self): observable = Stuff() observer1 = StuffWatcher(observable) observer2 = StuffWatcher(observable) self.assertEqual(len(observable._observers), 2) observable.setData(10) self.assertEqual(observer1._data, 10) self.assertEqual(observer2._data, 10) observable.setData(20) self.assertEqual(observer1._data, 20) self.assertEqual(observer2._data, 20) del observer1 self.assertEqual(len(observable._observers), 1) observable.setData(30) self.assertEqual(observer2._data, 30) observer3 = StuffWatcher() self.assertEqual(len(observable._observers), 1) observer3.subscribeToObservable(observable) self.assertEqual(len(observable._observers), 2) observable.setData(40) self.assertEqual(observer2._data, 40) self.assertEqual(observer3._data, 40) del observer2 self.assertEqual(len(observable._observers), 1) observable.setData(50) self.assertEqual(observer3._data, 50) def test_03_specificEvents(self): observable = Stuff() observer1 = StuffWatcher(observable, events=EVENT_UPDATE) observer2 = StuffWatcher(observable, events=EVENT_FOO) observable.setDataWithUpdateEvent(10) self.assertEqual(observer1._data, 10) self.assertEqual(observer2._data, None) observable = Stuff() observer1 = StuffWatcher(observable, events=(EVENT_UPDATE, EVENT_FOO)) observable.setDataWithUpdateEvent(10) self.assertEqual(observer1._data, 10) observable.setDataWithFooEvent(20) self.assertEqual(observer1._data, 20) def test_03_multipleObservables(self): observable1 = Stuff() observable2 = Stuff() observer = StuffWatcher() observer.subscribeToObservable(observable1, "update", EVENT_UPDATE) observer.subscribeToObservable(observable2, "report", EVENT_FOO) self.assertEqual(len(observable1._observers), 1) self.assertEqual(len(observable2._observers), 1) observable1.setDataWithUpdateEvent(10) self.assertEqual(observer._data, 10) self.assertEqual(observer._updateData, 10) self.assertEqual(observer._reportData, None) observable1.setDataWithFooEvent(20) self.assertEqual(observer._data, 10) self.assertEqual(observer._updateData, 10) self.assertEqual(observer._reportData, None) observable2.setDataWithUpdateEvent(30) self.assertEqual(observer._data, 10) self.assertEqual(observer._updateData, 10) self.assertEqual(observer._reportData, None) observable2.setDataWithFooEvent(40) self.assertEqual(observer._data, 40) self.assertEqual(observer._updateData, 10) self.assertEqual(observer._reportData, 40) if __name__ == "__main__": unittest.main()
4:11:59 PM
|
|
|
|
Sunday, May 16, 2004
|
|
| |
Rule for Refactoring Code
These are the rules I've developed over the years that I follow when refactoring code. They're rules in the sense that I always regret it if I fail to follow them for any reason.
- Unit Testing: Don't Even Start Without It. You won't know your refactoring didn't break the code unless you have unit test coverage of the affected code. If you don't have that coverage already, add new tests.
- Keep Each Refactoring Narrowly-Focused. Don't combine unrelated changes, refactor each one separately.
- Keep Your Unit Tests Fine-Grained. If a refactoring involves multiple functions, you're best off having unit tests for each function. If you depend on a single unit test for a function that then calls the refactored functions, a test failure leaves you not knowing which refactored function failed.
- Use Many Small Refactorings, Even When It Seems Inefficient. I prefer making a small refactoring even when I know that a subsequent refactoring will change that same code yet again. This is critical for refactoring tangled, poorly-written code.
- Unit Test Each Refactoring. Don't wait.
- Refactor Separate Functionality Into Separate Functions. If I encounter code that combines too much distinct functionality into one function, I try to break it apart into separate, individually-testable functions. With unit tests for each new function.
5:47:12 PM
|
|
|
|
Thursday, May 13, 2004
|
|
| |
Sun-Relative Time Events Using Python
For my current X10 home automation project, I want the ability to schedule events to occur at sun-relative times like 'sunrise' and 'sunset'. Now, you can do that by using an X10 light sensor and watching for the 'on' and 'off' commands it will send. But will the sensor be triggered by stray light, like from a passing car's headlights? I don't know. Perhaps with careful placement of the sensor, you could avoid accidental triggering. But there's another way to handle the problem.
Given your position on the world by latitude and longitude and today's date, sunrise and sunset can be calculated accurately using spherical geometry. It's what astromomers do. You can google around for existing software and source code that does these calculations. I wanted a Python solution, and so I settled for Sun.py by Henrik Härkönen. This is code for a Python class that calculates sunrise and sunset, as well as three flavors of 'twilight'. The code is a direct translation of someone else's C code, even to the point of preserving the original C comments, but hey, it works well. It calculates all times in UTC, so you'll need to apply the correct offset for your timezone.
With this code, I can now translate an X10 event scheduled for sunrise today into the actual sunrise time for today. That's one more item on my wish-list for X10 home automation I can check off.
11:31:38 AM
|
|
|
|
Monday, May 10, 2004
|
|
| |
Python and X10 Home Automation, Part 1.1
I wrote previously about Project WiSH, which gives you access to X10 computer interface controllers via device drivers. I also wrote about uprading my home system to Mandrake 10. Mandrake 10 has the new 2.6 kernel. Sadly, WiSH does not (yet) work with that kernel. So, I had to find an alternative.
From googling around, and comments from readers, I found the 'bottlerocket' software, for accessing the CM-17a 'Firecracker' X10 computer interface. This gives you a simple command-line program for issuing X10 commands via the CM-17a. It lacks the ability to watch for an X10 command coming in from a sensor, or querying the status of a status-capable X10 controller, so it's not a complete replacement for WiSH (but that functionality is not supported by the CM-17a anyway, so you don't get it with WiSH either), but it will do me for now.
When you are doing time-based automation under Linux, it makes sense to leverage Linux's built-in abilities. The 'cron' system lets you schedule tasks for execution based on date/time, either periodically on a period of your choosing, or at one particular data/time. The 'at' command lets you schedule a command for execution at one particular time in the furture. For example, typing the following command at the Linux command-line prompt will schedule the running of the command '/usr/local/bin/br a2 on' for 8:00pm today ('br' is the bottlerocket command): at 8:00pm today <<EOF /usr/local/bin/br a2 on EOF
To test the usability of 'at' for X10 home automation, I wrote a simple shell script that issues two 'at' commands -- one to turn on my driveway lights at dusk, and one to turn them off at 10:30pm. I then set up a 'cron' job to run this shell script ever day at 00:05 in the morning. Thus, each day the two 'at' jobs are reissued for the current day. It's necessary to reissue the commands each day because an 'at' job is a one-shot deal, while 'cron' is what you use to run commands at regular intervals.
This system of 'at', shell scripts, and 'cron' works fine, and demonstrates the simplest Linux/X10 home automation setup. For your needs, this may be all that you require. For me, this was just baby's first steps.
What I want is a more capable solution to the X10 home automation problem. I want to be able to schedule both periodic and one-time events. I want a system that understands how to deal with times like 'sunset' and 'sunrise'. I want to be able to alias the X10 house/unit code that controls my driveway lights as 'driveway lights'. I want to be able to create macros that trigger multiple commands, so that I can execute macro 'wakeup', and have the commands sent to turn on the TV, the coffee maker, and the window blinds opener(insert images of George Jetson being ejected from bed like toast from a toaster). I want to have a web interface to all of this. And, of course, I want to do it all with Python.
Sure there are several other pre-existing Linux solutions for X10 home automation. For fun and learning, stay tuned as I put together my own solution using Python.
2:03:02 PM
|
|
|
|
Wednesday, May 05, 2004
|
|
| |
Handling CSV Strings, Redux
I've written several times about the Python csv module and its limited API. I presented an adaptor class for use when you want to parse a CSV string, without being locked into reading sequentially from a file. Here's another way of doing it, using Python's standard 'I have a string, but I need a file-like object' adapter, the StringIO module. import StringIO # Or use cStringIO, it's faster import csv # First, create the StringIO object, then the csv reader object # using the StringIO object. sf = StringIO.StringIO() csvReader = csv.reader(sf) # Now, write a CSV string out to the StringIO object. csvData = '1,2,three,"four,five",6' sf.write(csvData) # We have to seek the StringIO pseudo-file back to its start. sf.seek(0,0) # Now read in the same data via the csv reader object. parsedData = csvReader.next()
When we look at what parsedData contains, we see a list like so:
[ '1', '2', 'three', 'four,five', '6' ]
Using StringIO rather than the adaptor class I presented earlier is a somewhat more heavy-weight way of solving the problem, but might fit your use better.
9:07:14 PM
|
|
Python and X10 Home Automation, Part 1
I recently saw an ad from x10.com for a free (you pay shipping) X10 starter kit, including a 'Firecracker' computer interface. That was a deal I couldn't pass up, so I ordered it through their web site, and 3 days later, the kit arrived.
The kit consists of the CM-17a 'Firecracker' serial computer interface, which transmits via radio, a transceiver module which receives the radio commands from the Firecracker and retransmits them via the X10 protocol over your house wiring, a lamp modules for controlling... lamps, and a PalmPilot-sized hand-held remote control that lets you manually do what the computer interface does. Oh, and the transceiver module also double as an appliance module, allowing you to control appliances of up to 500 watts.
With the hand-held controller, you can control any X10 modules you have, either the ones that come with the kit, or any add-on modules you may want to buy. You could go wild, like many do, and completely automate your home -- lights, appliances, garage door, pool heater, ferret feeder, whatever.
But with the computer interface, things get much more interesting. You can, for example, download from x10.com a free application that duplicates the appearance and functionality of the hand-held controller on your computer screen. Or, you can download, for $20, an application that fully utilizes your computer and the x10 interface to do full automation. Want your hot-tub to turn on at a certain time every day? No problem. Want your lights to simulate an occupied house while you are on vacation? Easy.
Naturally, hand an X10 computer interface to a Python programmer, and he'll immediately start writing code for it. Or that was my intent, anyway. The first thing I did was google around for any existing Python projects for X10. I found two, Pyxal and Pyx10. Both projects seem to be unmaintaned. Pyxal is pure Python, and does not support the recent X10 controllers, like the Firecracker. Pyx10 uses a wrapper to turn the XAL library into a Python extention module. It supports recent X10 controllers, including the Firecracker.
I downloaded and examined both. Pyxal was right out, as it has no Firecracker support (why not add it yourself, you ask? I'll get to that in a moment...). Pyx10 and XAL looked good. After compiling and installing XAL (a snap), I tried compiling Pyx10. Nope. The wrapper code for XAL would not compile. From a quick exam, it looked like it was out-of-sync with XAL.
I could have continued hacking at it to get it to work, but further googling (the trademark police are gonna get me), I found Project WiSH, a project for turning X10 device drivers into... well, Linux device drivers. Super! Instead of having to do low-level device handling from my code, I can simply open a linux device driver and write commands to it, just like I was writing text to a file. And WiSH was a snap to compile and install. Just make sure you have your kernel source loaded on your machine. (For the CM-17a 'Firecracker', be sure to download the 1.6.10 version of WiSH. The later 2.0.1 version does not yet support it. But both versions support the CM-11a, which is the other modern popular X10 computer interface controller.)
Now, I do my work under Linux, so this is just what the code doctor ordered. Actually, it's even better than it sounds. You see, there's this little bit of info about that Firecracker X10 controller...
If you look at one of the other X10 computer interfaces, say the CM-11a that comes with another of the home automation intro packages that x10.com sells, you will see that it is controlled via the computer in a manner rather like an external serial modem. Connect it to your serial port, and send it strings of ASCII characters. Not so with the CM-17a 'Firecracker'. This little guy is a serial pass-thru 'dongle', very small. From what I can tell from my Google research, you must directly control the radio transmitter in it via bit-tiddling the RTS and DTR lines of the serial port. You must assemble a 5-byte command via bit masking, then bit-shift it out to the CM-17a by directly controlling the states of the RTS and DTR lines, doing the timing yourself. There are no smarts. Ouch. No wonder this is the bargain-basement controller.
The CM-11a controller has another advantage, too. It's smart, it has its own processor. So you don't even need to leave your computer on to do real-time home automation. Use the scheduling software to send it commands, like 'turn on my security light at local-time dusk, and turn it off at dawn', and the CM-11a will do it, all by itself.
But I don't have the CM-11a. I have a CM-17a and a Linux box. Add in the device drivers from Project WiSH, and from a Linux command line, I can execute 'echo "on" >>/dev/x10/a1', and send the 'on' command to the X10 device at house code 'A', unit code '1'. How cool is that?
OK, how can we combine equal portions of X10, Project WiSH, Linux, Python, and fun? (OK, fun gets a bigger portion.)
Here's the deal. I work for a major software house. We do automated nightly compiles of our code on all of the platforms we support (Linux, various flavors of UNIX, Windoze). The last thing you want is for some code change you made that day to 'break the build'. The automated process sends out email giving that night's build status. If you broke the build, it's supposed to be your first priority to fix it.
I keep forgetting to check my email. I have many projects, they grab my attention, and it may be hours before I check my mail. Yes, I have a little task bar thingie that tells me if I get new mail. I don't look at it if I'm concentrating on a problem.
Python and X10 to the rescue! (This is a fun solution looking for a problem.) I now have a Python script that is run via cron every 10 minutes. It uses the poplib and email modules to grab and parse my email, looking for the specific patterns that a 'you broke the build' message will contain. If it finds such a message, it opens and writes an 'on' command to the proper X10 device driver, which then turns on the BIG RED ROTATING LIGHT. I kid you not.
This is so much fun!
2:34:14 PM
|
|
|
|
Tuesday, September 23, 2003
|
|
| |
The Python CSV Module and Legacy Data
When you work with csv files as much as I do, particularly with csv files created by legacy applications, you tend to run into the odd problems. Consider the following legacy csv data (a real example):
"this is","an, example","10","of problem data",20
The reader of Python's csv module will turn this into a list like so:
[ "this is", "an, example", "10", "of problem data",20 ]
No problem so far. Now, let's use the csv writer to turn this same data back into csv data again, round trip. Without taking any special precautions, we would get:
this is,"an, example",10,of problem data,20
What happened here? Well, the csv writer will normally only quote data when it contains the field separator. We can get closer to what we want (that is, recreating the original csv data) by using the QUOTE_NONNUMERIC parameter to the writer. When we do, we get:
"this is","an, example",10,"of problem data",20
Closer, but the third field, which was quoted in the original data, is not. We could try using the QUOTE_ALL parameter, which would give us the third field quoted, but unfortunately we'd also get the fifth field quoted, which was not the way the original data had it.
What I need is a way of controlling the quoting of fields on a field by field basis. Sadly, Python's csv module doesn't give me that level of control over field quoting. So when I have to deal with legacy csv data like that above, I'm forced to bypass the csv module for writing, and roll my own. I can still use the csv module for reading.
11:29:11 PM
|
|
Working With Fixed Record Length CSV Files
Yesterday I wrote about working with fixed record length files, and presented a class for making it easy. Today I'll extend that idea to handle records containing csv data.
The new CSVRecordFile class inherits from the RecordFile class, overriding the read and write methods to add csv parsing and formatting. This allows you to read and write fixed length csv data records in random order. The StringCSVAdaptor class (presented earlier) is used to enable us to use Python2.3's csv module with strings. This is necessary since the csv modules reader and writer functions expect to work with interables, such as a file-like object or a sequence.
Matt Goodall took me up on the need for the StringCSVAdaptor class, rightly pointing out a simpler way of handling the problem (thanks, Matt! I think you are reader #4 of this weblog, and the first person to leave me a comment!) Sadly, Matt's suggestion does not fit with the problem domain I'm using CSVRecordFile for. For one thing, I need both a csv reader and writer. I only want to create these objects once per CSVRecordFile instance, and then use them to parse/format many records in random (not sequential) order. Matt's solution, while useful for simple one-shot csv needs, looks to me to require the creation of the reader and writer for each record that is to be parsed (because he wraps the string to be parsed in a list to make it an iterable). You can read Matt's comments from yesterday's post.
Put this code in a file called csvrecfile.py: """This file contains the CSVRecordFile class, for working with fixed length record files, where the records contain csv data.""" __author__ = "Mike Kent" __version__ = "$Id$".split()[-2:][0] import recfile import csv import csvadaptor class CSVRecordFileFmtError(Exception): pass class CSVRecordFile(recfile.RecordFile): """This class provides a standard way to handle files which are layed out as fixed-length records containing csv data, where each record is padded to its proper length with a padding character, and may be optionally terminated with a record terminator string.""" def __init__(self, filename, mode, reclen, recpad='', recterm=None, **csvKwParams): recfile.RecordFile.__init__(self, filename, mode, reclen, recpad, recterm) self.csvAdaptor = csvadaptor.StringCSVAdaptor() self.csvReader = csv.reader(self.csvAdaptor, **csvKwParams) self.csvWriter = csv.writer(self.csvAdaptor, **csvKwParams) return def read(self, recNum): """Read a record containing csv data by record number, and return a list of strings. Record numbers start a 1. An empty list will returned on end of file.""" self.csvAdaptor.data = recfile.RecordFile.read(self, recNum) try: rec = self.csvReader.next() except csv.Error: raise CSVRecordFileFmtError return rec def write(self, recNum, valueList): """Write a list of mixed-type values to a record, in csv format, by record number. Record numbers start with 1. The record will be padded to the correct length using the padding character, and optionally terminated by the record terminator string. You can seek to, and write, records beyond EOF. However, to append a new record to the current actual EOF, give a record number of 0. This function returns the actual record number written to.""" try: self.csvWriter.writerow(valueList) except csv.Error: raise CSVRecordFileFmtError return recfile.RecordFile.write(self, recNum, self.csvAdaptor.data)
Here are the unit tests. Put this code in a file called test_csvrecfile.py: #! /usr/bin/env python import sys import unittest import csv import csvrecfile class TestCases_01_RecordFile(unittest.TestCase): def test_01_instantiate(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "w+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) return class TestCases_02_RecordFileWriteAdd(unittest.TestCase): def test_01_writeAddOne(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "w+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) rec = [ 1, 2, "three" ] recFileObj.write(0, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") expected = '1,2,"three" r\n' newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return def test_02_writeAddSeveral(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "w+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) for count in range(5): rec = [ 1, 2, "Record %d" % (count + 1) ] recFileObj.write(0, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") for count in range(5): expected = '1,2,"Record %d" r\n' % (count + 1) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return class TestCases_03_RecordFileWriteRandom(unittest.TestCase): def setUp(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "w+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) for count in range(5): rec = [ 1, 2, "Record %d" % (count + 1) ] recFileObj.write(count + 1, rec) recFileObj.close() return def test_01_writeRandomOne(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "r+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) recNum = 2 rec = [ 3, 4, "New record %d" % recNum ] recFileObj.write(recNum, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") expected = '3,4,"New record %d"r\n' % recNum testFileObj.seek(20) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return def test_02_writeRandomSeveral(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "r+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) recNumList = [ 1, 5, 3, 2, 4 ] for recNum in recNumList: rec = [ 1, 2, "New record %d" % recNum ] recFileObj.write(recNum, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") for count in range(5): expected = '1,2,"New record %d"r\n' % recNumList[count] testFileObj.seek((recNumList[count] - 1) * 20) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return class TestCases_04_RecordFileReadRandom(unittest.TestCase): def setUp(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "w+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) self.assert_(recFileObj is not None) for count in range(5): rec = [ 1, 2, "Record %d" % (count + 1) ] recFileObj.write(count + 1, rec) recFileObj.close() return def test_01_readRandomOne(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "r+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) rec = recFileObj.read(3) expected = [ "1", "2", "Record 3" ] self.assertEqual(rec, expected) return def test_02_readRandomSeveral(self): recFileObj = csvrecfile.CSVRecordFile("test.txt", "r+b", 20, ' ', 'r\n', lineterminator='', quoting=csv.QUOTE_NONNUMERIC) recNumList = [ 1, 5, 3, 1, 2, 2, 4 ] for recNum in recNumList: rec = recFileObj.read(recNum) expected = [ "1", "2", "Record %d" % recNum ] self.assertEqual(rec, expected) return if __name__ == "__main__": unittest.main() sys.exit(0)
1:04:57 PM
|
|
|
|
Monday, September 22, 2003
|
|
| |
Working With Fixed Record Length Files
I'm often called upon to work with data files that contain records composed of CSV data, where all of the records in the file have one fixed record length. Here's a small class I wrote to make the handling of fixed record length files easy. It does not handle the parsing of CSV data itself; for that, use my CSV Adaptor from my previous article.
Put this code in a file named recfile.py: """This file contains the RecordFile class, for working with fixed length record files.""" __author__ = "Mike Kent" __version__ = "$Id$".split()[-2:][0] class RecordFileOpenError(Exception): pass class RecordFileReadError(Exception): pass class RecordFileTruncError(Exception): pass class RecordFileWriteError(Exception): pass class RecordFile: """This class provides a standard way to handle files which are layed out as fixed-length records, where each record is padded to its proper length with a padding character, and may be optionally terminated with a record terminator string.""" def __init__(self, filename, mode, reclen, recpad=' ', recterm=None): """The default record padding string is a single space. The record terminator defaults to None.""" try: self._file = file(filename, mode) except IOError: raise RecordFileOpenError self.recLen = reclen self.recPad = recpad self.recTerm = recterm self.lenRecTerm = recterm and len(recterm) or 0 return def close(self): if hasattr(self, "_file"): self._file.close() return __del__ = close def flush(self): self._file.flush() return def read(self, recNum): """Read a record by number, and return a string. Record numbers start a 1. The resulting string will have any record terminator or padding specified on class initialization stripped. An empty string will be returned on EOF.""" if recNum < 1: raise RecordFileReadError try: self._file.seek((recNum - 1) * self.recLen) rec = self._file.read(self.recLen) except IOError: raise RecordFileReadError lenRec = len(rec) # If we got a record... if lenRec > 0: # If what we read was too short, or it's supposed to have a record # terminator, but it's not there... if(lenRec < self.recLen or (self.lenRecTerm and not rec.endswith(self.recTerm))): raise RecordFileReadError # If it is supposed to have a record terminator, and it does, # strip it. if self.lenRecTerm and rec.endswith(self.recTerm): rec = rec[:-self.lenRecTerm] # If there is padding present, strip it. if len(self.recPad): rec = rec.rstrip(self.recPad) return rec def write(self, recNum, data): """Write a string to a record by record number. Record numbers start with 1. The record will be padded to the correct length using the padding character, and optionally terminated by the record terminator string. You can seek to, and write, records beyond EOF. However, to append a new record to the current actual EOF, give a record number of 0. This function returns the actual record number written to.""" newRecNum = recNum lenData = len(data) # Calculate the amount of padding needed. paddingNeeded = self.recLen - (lenData + self.lenRecTerm) # If that amount is negative, the record data is too long to fit. if paddingNeeded < 0: raise RecordFileTruncError # If padding is needed, append it to the record data. if paddingNeeded > 0: data += self.recPad * paddingNeeded # If a record terminator is wanted, append it to the record data. if self.lenRecTerm: data += self.recTerm # If the record number is zero, we want to seek to the current # end of file... if recNum == 0: offset = 0 whence = 2 # Seek relative to the end # Else we want to seek to the beginning of the specified record. else: offset = (recNum - 1) * self.recLen whence = 0 # Seek relative to the beginning try: self._file.seek(offset, whence) # If we are writing to the current end of file, # calculate what that record number is. if recNum == 0: newRecNum = (self._file.tell() / self.recLen) + 1 self._file.write(data) except IOError: raise RecordFileWriteError # Return the actual record number written to. return newRecNum
Although you might not think so from my previous posts to this weblog, I'm a firm believer in unit testing, so here are the tests for the above code.
Put this code in a file named test_recfile.py: #! /usr/bin/env python import sys import unittest import recfile class TestCases_01_RecordFile(unittest.TestCase): def test_01_instantiate(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) return class TestCases_02_RecordFileWriteAdd(unittest.TestCase): def test_01_writeAddOne(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) rec = "this is a test" recFileObj.write(0, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") expected = "this is a test \r\n" newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return def test_02_writeAddSeveral(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) for count in range(5): rec = "Record %d" % (count + 1) recFileObj.write(0, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") for count in range(5): expected = "Record %d \r\n" % (count + 1) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return class TestCases_03_RecordFileWriteRandom(unittest.TestCase): def setUp(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) for count in range(5): rec = "Record %d" % (count + 1) recFileObj.write(0, rec) recFileObj.close() return def test_01_writeRandomOne(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) rec = "this is a test" recFileObj.write(2, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") expected = "this is a test \r\n" testFileObj.seek(20) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return def test_02_writeRandomSeveral(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) recNumList = [ 1, 5, 3, 2, 4 ] for recNum in recNumList: rec = "New record %d" % recNum recFileObj.write(recNum, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") for count in range(5): expected = "New record %d \r\n" % recNumList[count] testFileObj.seek((recNumList[count] - 1) * 20) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return class TestCases_04_RecordFileReadRandom(unittest.TestCase): def setUp(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) for count in range(5): rec = "Record %d" % (count + 1) recFileObj.write(0, rec) recFileObj.close() return def test_01_readRandomOne(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') rec = recFileObj.read(3) expected = "Record 3" self.assertEqual(rec, expected) return def test_02_readRandomSeveral(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') recNumList = [ 1, 5, 3, 1, 2, 2, 4 ] for recNum in recNumList: rec = recFileObj.read(recNum) expected = "Record %d" % recNum self.assertEqual(rec, expected) return if __name__ == "__main__": unittest.main() sys.exit(0)
4:47:19 PM
|
|
|
|
© Copyright
2004
Michael Kent.
Last update:
6/15/2004; 4:12:06 PM.
This theme is based on the SoundWaves
(blue) Manila theme. |
|
| June 2004 |
| Sun |
Mon |
Tue |
Wed |
Thu |
Fri |
Sat |
| |
|
1 |
2 |
3 |
4 |
5 |
| 6 |
7 |
8 |
9 |
10 |
11 |
12 |
| 13 |
14 |
15 |
16 |
17 |
18 |
19 |
| 20 |
21 |
22 |
23 |
24 |
25 |
26 |
| 27 |
28 |
29 |
30 |
|
|
|
| May Jul |
|