Tuesday, April 12, 2011

Generalized IEnumerable to chunked IEnumerable in F#

F# sequence expressions, under the covers, are complicated beasts; for the purposes of today's exercise, the factor of interest is that they Dispose() internal enumerators aggressively, unlike in the C# case, so the simple Ratchet wrapper isn't enough by itself. So, with some refactoring, we get

namespace Tinesware.Enumerables
open System
open System.Collections.Generic
open System.IO
open System.Linq
open Should.Fluent
module Chunk =
/// <summary>
/// Turns a stream into a byte enumeration
/// </summary>
/// <param name="stream">The stream to wrap</param>
/// <returns>The stream as an enumeration</returns>
let rec ToEnumerable (stream : Stream) =
seq {
let result = stream.ReadByte()
if result >= 0 then
yield (byte result)
yield! ToEnumerable stream
stream.Close()
}
type private Controlled<'a>(iterator: IEnumerator<'a>) =
interface IEnumerator<'a> with
member self.Current = iterator.Current
interface System.Collections.IEnumerator with
member self.Current = iterator.Current :> Object
member self.Reset() = iterator.Reset()
member self.MoveNext() = iterator.MoveNext()
interface IDisposable with
member self.Dispose() = () // The whole purpose of the type -- prevent premature disposal
type private Ratchet<'a>(iterator: IEnumerator<'a>) =
interface IEnumerable<'a> with
member self.GetEnumerator() = iterator
interface System.Collections.IEnumerable with
member self.GetEnumerator() = iterator :> System.Collections.IEnumerator
/// <summary>
/// Split an enumerable into an enumeration of arrays
/// </summary>
/// <typeparam name="T">Element type of the array</typeparam>
/// <param name="source">The IEnumerable to operate on</param>
/// <param name="chunk">The window size</param>
/// <returns>The enumeration</returns>
let Window (chunk : int) (source : 'a seq) =
let rec innerWindowing x =
seq {
let result = x |> Seq.truncate chunk |> Seq.toArray
if result.Length > 0 then
yield result
yield! innerWindowing x
}
innerWindowing <| new Ratchet<'a>(new Controlled<'a>(source.GetEnumerator()))
/// <summary>
/// A Self-test program
/// </summary>
[<EntryPoint>]
let main a =
let input = seq { 0 .. 41 }
|> Seq.map byte
|> Seq.toArray
let inStream = new MemoryStream(input)
let channel = inStream |> ToEnumerable
let chunk = channel |> Seq.truncate 16 |> Seq.toArray
chunk.Length.Should().Equal(16) |> ignore
chunk.[0].Should().Equal((byte)0) |> ignore
let chunk = channel |> Seq.truncate 16 |> Seq.toArray
chunk.Length.Should().Equal(16) |> ignore
chunk.[0].Should().Equal((byte)16) |> ignore
let chunk = channel |> Seq.truncate 16 |> Seq.toArray
chunk.Length.Should().Equal(10) |> ignore
chunk.[0].Should().Equal((byte)32) |> ignore
let chunks = input |> Window 16 |> Seq.toArray
chunks.Length.Should().Equal(3) |> ignore
chunks.[0].Length.Should().Equal(16) |> ignore
chunks.[1].Length.Should().Equal(16) |> ignore
chunks.[2].Length.Should().Equal(10) |> ignore
let inStream = new MemoryStream(input)
let chunks = inStream |> ToEnumerable |> Window 16 |> Seq.toArray
chunks.Length.Should().Equal(3) |> ignore
chunks.[0].Length.Should().Equal(16) |> ignore
chunks.[1].Length.Should().Equal(16) |> ignore
chunks.[2].Length.Should().Equal(10) |> ignore
0
view raw gistfile1.fs hosted with ❤ by GitHub

Note that this version closes the stream after the iteration completes -- we could leave the management of the stream entirely outside the iterator if we wished. Since we are re-using our Enumerator instance, we only have the one left for garbage collection, so suppressing the disposal doesn't litter too badly.

Overall, if you're in the simple use-case of pulling chunks out of a file in order, the earlier version of the system will suffice.


Edit 1-May-11 : Comparing with 2#4u's Seq.breakBy I get on my laptop

  • Real: 00:00:03.948, CPU: 00:00:03.946 for Seq.breakByV1
  • Real: 00:00:00.345, CPU: 00:00:00.374 for Seq.breakByV2
  • Real: 00:00:00.910, CPU: 00:00:00.904 for Chunk.Window

making the functional Chunk.Window implementation about 2.4 times slower than the faster -- but with more internal mutability -- of the the two.

3 comments :

Mark Rockmann said...

Two version and still the wrong code... Repeatedly calling Seq.truncate (or Enumerable.Take) will yield always the same elements. Btw, what about Seq.windowed?

Steve Gilham said...

1) What do you think Ratchet and Controlled are doing?

2) If you try that code, you will see that it passes the tests.

3) As I said earlier, "I may have missed these in F#"

Steve Gilham said...

4) Seq.windowed doesn't do what Chunk.Window does. windowed slides a window through the sequence one element at a time (useful for doing moving averages); Window returns the sequence partitioned into disjoint adjacent sections (useful for breaking a file into fixed size pieces to process)

Doing this the TDD way, define input as above and go

let chunks = input |> Seq.windowed 16 |> Seq.toArray
chunks.Length;;

get val it : int = 27 = 42-16+1 and not the value 3 = (42+16-1)/16