A bit of an experiment…
I was tinkering with some code the other day to pull down some images from FlickR using one of their RESTful APIs for searching.
Essentially, it revolves around building up a URI to represent the search which (in my case) looks something like;
http://api.flickr.com/services/rest/?method=flickr.photos.search&
and then you add on parameters into the query string for api_key, text, page, per_page, content_type where I’ve found that a content_type of “7” works really nicely
If you set one of those things up then you get back a lump of XML (in this case) and that might look something like this one that I got from searching for the word “Hill”;
<rsp stat="ok"> <photos page="1" pages="1085800" perpage="5" total="5429000"> <photo id="5393713311" owner="40865375@N06" secret="0d67e354d2" server="5300" farm="6" title="Hills." ispublic="1" isfriend="0" isfamily="0" /> <photo id="5395344638" owner="26476877@N04" secret="804f2e0b8e" server="5018" farm="6" title="The Beast of the East (Save Theford Forest)" ispublic="1" isfriend="0" isfamily="0" /> <photo id="5395348500" owner="48424317@N03" secret="bd5df5d37f" server="5019" farm="6" title="Nearby Taramundi" ispublic="1" isfriend="0" isfamily="0" /> <photo id="5394747587" owner="22003551@N07" secret="2bcf2d843e" server="5098" farm="6" title="" ispublic="1" isfriend="0" isfamily="0" /> <photo id="5395348970" owner="8863279@N08" secret="659d24bc7b" server="5218" farm="6" title="Bellville Badassness" ispublic="1" isfriend="0" isfamily="0" /> </photos> </rsp>
and there’s then a magic way that you go from this XML to an actual photo by constructing another Uri that looks something like;
http://farm6.static.flickr.com/5300/5393713311_0d67e354d2.jpg
and that gets you to an actual photograph.
I wanted a stream of these photographs and, traditionally, I’d have written code that did something like;
- Build something that makes up the URI representing a search.
- Open up an HttpWebRequest of some kind.
- Grab the response.
- Read the XML.
- Parse the XML back into some kind of results class.
- Fire some kind of event saying “I have a new page of results data”
- Fire some kind of sub-event for each photo saying “I have a new photo result”
but it occurred to me that a “stream of photographs” is something that the Reactive Extensions is well placed to deliver so I thought I’d experiment with that.
I ended up being able to do something like this from the point of view of some consuming code;
FlickrSearch s = new FlickrSearch("grape", TimeSpan.FromSeconds(1)); s.PhotoResults.Subscribe(p => { Console.WriteLine( "Results Page {0}, Photo {1}, [{2}]", p.PageNo, p.Title, p.ImageUri); }, () => { Console.WriteLine("Done"); }); Console.ReadLine();
and results from FlickR go streaming by with, in this case, a new image arriving every second.
This is “powered” by an almost certainly defective FlickrSearch class which I had at least 5 attempts at before ending up with this;
class FlickrSearch { public FlickrSearch(string searchText, TimeSpan interval) { this.state = new FlickSearchState(searchText); this.interval = interval; this.photoResults = new Lazy<IObservable<FlickrPhotoResult>>( MakePhotoResults); } public IObservable<FlickrPhotoResult> PhotoResults { get { return (this.photoResults.Value); } } IObservable<FlickrPhotoResult> MakePhotoResults() { var webRequests = this.state.GetObservableSearchUris().SelectMany( uri => { HttpWebRequest wr = WebRequest.Create(uri) as HttpWebRequest; return (Observable.Defer(Observable.FromAsyncPattern<WebResponse>( wr.BeginGetResponse, wr.EndGetResponse))); }); var xmlResponses = webRequests.Select( wr => { XElement xml = null; FlickrPhotoPageResult result = null; using (Stream stream = wr.GetResponseStream()) { xml = XElement.Load(stream); result = new FlickrPhotoPageResult(xml); }; return (result); }); var photoResponses = xmlResponses.SelectMany( pageResponse => { return (pageResponse.GetPhotos()); }); var slowedDownResponses = photoResponses.Zip( Observable.Interval(this.interval), (photo, count) => { return (photo); }); var slowedDownLoadsNextPage = slowedDownResponses.Do(p => { // TODO: not sure about this having a side-effect which // can advance/end the observable at the "tail of the // chain" here. if (p.IsLastImage) { this.state.Advance(end:true); } else if (p.IsLastImageOnCurrentPage) { this.state.Advance(end:false); } }); return (slowedDownLoadsNextPage); } Lazy<IObservable<FlickrPhotoResult>> photoResults; FlickSearchState state; TimeSpan interval; }
The “trick” that I was trying to achieve is in the MakePhotoResults method and what it’s trying to do is to set up a sort of pipeline of sequences which run something like this;
- search URI strings are produced
- and consumed by turning them into HttpWebResponses
- consumed by reading XML from them into an object that represents each page of results
- consumed by splitting that XML into objects that represent each photo in those results
- consumed by a call that tries to restrict that down to only produce results on the interval specified to slow things down a little
- consumed by a call that tries to figure out if we are at the end of a page or at the end of the results altogether and, if so, take appropriate action to end or continue the original sequence. That is – to cause step 1 above to fire either a new URI for the next page of search results or cause it to end the sequence.
and I didn’t feel “too bad” about that but I’m not 100% sure that this link between step 6 and step 1 is “safe” or “right” – it leaves me feeling a little itchy.
In order to try and make this work, this code relies on a nested class FlickrSearchState which looks like;
class FlickSearchState { public FlickSearchState(string searchText) { this.searchUri = new FlickrSearchUri(); this.searchUri.SearchText = searchText; this.searchUris = new Subject<string>(); } public IObservable<string> GetObservableSearchUris() { return (Observable.Return<string>(this.searchUri.Uri).Concat( this.searchUris)); } public void Advance(bool end = false) { if (end) { this.searchUris.OnCompleted(); } else { // TODO: Is this safe? I suspect not. this.searchUri.Page++; this.searchUris.OnNext(this.searchUri.Uri); } } Subject<string> searchUris; FlickrSearchUri searchUri; }
The GetObservableSearchUris method tries to return an Observable<string> by concatenating the initial URI ( which will be for page 1 ) with any subsequent URI’s ( for page 2 etc ) and those subsequent URIs will only be generated if someone calls Advance(false) and the sequence will end when someone calls Advance(true).
This introduced me to Subject<T> which I hadn’t encountered before. I suspect that the increment of the page number in the Advance method is likely to be at risk from a concurrency problem.
This code relies on a simple class that knows how to construct a FlickR URI given some parameters;
public class FlickrSearchUri { public FlickrSearchUri() { ContentType = 7; // not a magic number, this is "lucky 7" 😉 PerPage = 5; #error "Need to get an API key" ApiKey = string.Empty; Page = 1; } public int ContentType { get; set; } public int PerPage { get; set; } public int Page { get; set; } public string SearchText { get; set; } public string ApiKey { get; set; } public string Uri { get { return ( string.Format( "http://api.flickr.com/services/rest/?method=flickr.photos.search&" + "api_key={0}&" + "text={1}&" + "page={2}&" + "per_page={3}&" + "content_type={4}", ApiKey, SearchText, Page, PerPage, ContentType)); } } }
and it is used to build the initial Uri and then subsequent ones are produced by simply changing the Page property and then re-requesting the Uri property.
There’s a couple of classes that then deal with the results – one that handles a page of results from the REST API ( so long as there aren’t any errors which I ignore completely );
public class FlickrPhotoPageResult { public FlickrPhotoPageResult(XElement xmlFlickrResponse) { this.xmlFlickrResponse = xmlFlickrResponse; XElement element = xmlFlickrResponse.DescendantsAndSelf("photos").Single(); this.PageNo = (int)element.Attribute("page"); this.Pages = (long)element.Attribute("pages"); this.PerPage = (int)element.Attribute("perpage"); this.Total = (long)element.Attribute("total"); } public IObservable<FlickrPhotoResult> GetPhotos() { return (( this.xmlFlickrResponse.DescendantsAndSelf("photo").Select( (p, i) => { return (new FlickrPhotoResult(p, i + 1, this)); })).ToObservable()); } public int PageNo { get; private set; } public long Pages { get; private set; } public int PerPage { get; private set; } public long Total { get; private set; } XElement xmlFlickrResponse; }
there’s a sneaky bit of work going on here in the GetPhotos() method in that the instance passes both itself and an index down to the FlickrPhotoResult in order that the particular photograph result can attempt to later determine whether it represents;
- the last photograph in the whole result-set
- the last photograph on a particular page in the result-set.
and you can see some of that going on here in the FlickrPhotoResult which represents a single photo;
public class FlickrPhotoResult { public FlickrPhotoResult( XElement photo, int positionOnPage, FlickrPhotoPageResult resultsPage) { this.resultsPage = resultsPage; this.positionOnPage = positionOnPage; Id = (long)photo.Attribute("id"); Secret = photo.Attribute("secret").Value; Farm = (int)photo.Attribute("farm"); Server = (int)photo.Attribute("server"); Title = photo.Attribute("title").Value; } public int PageNo { get { return (this.resultsPage.PageNo); } } public bool IsLastImage { get { long imageNo = ((this.resultsPage.PageNo - 1) * this.resultsPage.PerPage) + this.positionOnPage; return (imageNo == this.resultsPage.Total); } } public bool IsLastImageOnCurrentPage { get { return (this.positionOnPage == (this.resultsPage.PerPage)); } } public long Id { get; private set; } public string Secret { get; private set; } public int Farm { get; private set; } public string Title { get; private set; } public int Server { get; private set; } public Uri ImageUri { get { return (new Uri(string.Format( "http://farm{0}.static.flickr.com/{1}/{2}_{3}.jpg", Farm, Server, Id, Secret))); } } FlickrPhotoPageResult resultsPage; int positionOnPage; }
Whilst writing this experimental (i.e. not working properly) code I had a few thoughts;
- I find it pretty hard to reason about some of the code and the “context” in which it’s called. For example – I’m fairly certain that I am sharing FlickrSearchUri.Page across threads and am not protecting it properly but I haven’t verified that this is the case whereas if I was writing traditional (non Rx) code I’d perhaps have more idea about the context in which my code is being called.
- At one point I had a bug where the code wasn’t stopping when it reached the end of the result-set from FlickR having gone through N pages of data. I found it pretty hard to debug as my lambdas were firing in response to a lot of call-stacks that weren’t “so obvious” to figure out.
- I feel I’m giving the class FlickrPhotoResult too much responsibility in having the knowledge around whether a particular photo is the last on a page or the very last photo altogether. I suspect I should be altering my Select() calls instead to avoid having to pass this data around inside linked object instances.
Anyway, I learned one or two things while playing around with this and so I figured I’d take the same code and bring it into a graphical environment so I put a quick WPF shell around it.
This raised a couple of “issues”, both of which were pretty obvious;
- Having a URI is one thing, having a downloaded picture is another.
- There’s also the UI thread to remember but Rx takes care of that pretty nicely.
To work around the first “problem”, I attempted to have my observable sequence ultimately return Stream rather than just the string that represented the URI of the image. That is, I “added extra clauses”;
IObservable<Stream> MakePhotoResults() { var webRequests = this.state.GetObservableSearchUris().SelectMany( uri => { HttpWebRequest wr = WebRequest.Create(uri) as HttpWebRequest; return (Observable.Defer(Observable.FromAsyncPattern<WebResponse>( wr.BeginGetResponse, wr.EndGetResponse))); }); var xmlResponses = webRequests.Select( wr => { XElement xml = null; FlickrPhotoPageResult result = null; using (Stream stream = wr.GetResponseStream()) { xml = XElement.Load(stream); result = new FlickrPhotoPageResult(xml); }; return (result); }); var photoResponses = xmlResponses.SelectMany( pageResponse => { return (pageResponse.GetPhotos()); }); var slowedDownResponses = photoResponses.Zip( Observable.Interval(this.interval), (photo, count) => { return (photo); }); var slowedDownLoadsNextPage = slowedDownResponses.Do(p => { // TODO: not sure about this having a side-effect which // can advance/end the observable at the "tail of the // chain" here. if (p.IsLastImage) { this.state.Advance(end: true); } else if (p.IsLastImageOnCurrentPage) { this.state.Advance(end: false); } }); var streams = slowedDownLoadsNextPage.SelectMany( photo => { HttpWebRequest request = WebRequest.Create(photo.ImageUri) as HttpWebRequest; return(Observable.Defer( Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse))); }); var byteArrays = streams.Select( resp => { HttpWebResponse webResponse = (HttpWebResponse)resp; MemoryStream ms = new MemoryStream(); using (Stream s = resp.GetResponseStream()) { s.CopyTo(ms); } ms.Seek(0, SeekOrigin.Begin); return (ms); }); return (byteArrays); }
Once again – not sure this is entirely right (or even nearly right) but at least at a basic level it leaves me with an observable sequence of Stream which I ultimately just consumed and solved problem number 2 by having Rx do the work for me;
FlickrSearch search = new FlickrSearch(text, TimeSpan.FromSeconds(3)); this.currentSearch = search.PhotoResults. ObserveOn(Dispatcher.CurrentDispatcher). Subscribe(stream => { BitmapImage bi = new BitmapImage(); bi.BeginInit(); bi.StreamSource = stream; bi.EndInit(); this.Image = bi; });
with the ObserveOn moving the work back to the UI thread.
All in all, I found trying to work “the Rx way” really interesting and in some ways I was taken back to around 20 years ago when we were students trying to solve the “farmer, wolf, goat and cabbage” problem in LISP in that this is very functional.
Feel very free to comment on the code and improve it if you like as I’m fairly certain it’s laced with bugs.