Friday, September 6, 2013

RxC++ ReactiveCommand

I rewrote the ReactiveCommand class (from Paul Betts ReactiveUI library for C#) to demonstrate how to bind async commands to Xaml Buttons. This is a work in progress (it does not use data binding yet, is missing some error handling and there is no reason that it should be specific to the Xaml UI framework - should work with WIN32 button etc..), but I did build it for use in this Windows 8.1 sample.

ReactiveCommand models calls to the Execute function as a Sequence of the value passed to each Execute. All subscriptions to it are called on the UI thread. There can be many subscriptions directly to ReactiveCommand, there can also be many async functions registered.

Each async function registered takes the value passed to Execute does work and returns a result. Each call to RegisterAsyncCommand returns a new Sequence of the results from each call to the registered function. All subscriptions to the Sequence of results are called on the UI thread.

ReactiveCommand also provides Sequences of bools, that can be subscribed, that represent its state. When the CanExecute Sequence is bound to the enabled property of the button the state of the ReactiveCommand is communicated into the ui.

By default ReactiveCommand will ignore Execute until all the subscribers (sync and async) to the previous Execute call have finished. This can be changed during construction.

Here is some (out-of-order) usage from the sample.

Subscription - async version (Scenario1.Xaml.cpp):
from(enable->RegisterAsyncFunction(
[](RoutedEventPattern)
{
// background thread
// enable and disable commands will be disabled until this is finished
std::this_thread::sleep_for(std::chrono::seconds(2));
return true;
})) // this is a subscription to the enable ReactiveCommand
// back on the ui thread
.subscribe([this](bool) // takes whatever was returned above
{
// update the ui
});

Subscription - sync version (Scenario1.Xaml.cpp):
// enable the scenario when enable is executed
from(observable(enable))
.where([this](RoutedEventPattern)
{
return accelerometer != nullptr;
})
.select_many([=](RoutedEventPattern)
{
return from(visible)
.select_many([=](bool)
{
// enable sensor input
return rx::from(readingChanged)
.take_until(invisible);
})
.take_until(endScenario); // this is a subscription to the disable ReactiveCommand
})
.subscribe([this](AccelerometerReading^ reading)
{
// on the ui thread
this->ScenarioOutput_X->Text = reading->AccelerationX.ToString();
this->ScenarioOutput_Y->Text = reading->AccelerationY.ToString();
this->ScenarioOutput_Z->Text = reading->AccelerationZ.ToString();
});

Binding (Scenario1.Xaml.cpp):
rxrt::BindCommand(ScenarioEnableButton, enable);
rxrt::BindCommand(ScenarioDisableButton, disable);

Creation (Scenario1.Xaml.cpp):
// start out disabled
auto enabled = std::make_shared<rx::BehaviorSubject<bool>>(false);
// start out not-working
auto working = std::make_shared<rx::BehaviorSubject<bool>>(false);
// use !enabled and !working to control canExecute
enable = std::make_shared < rxrt::ReactiveCommand < RoutedEventPattern> >(observable(from(enabled)
.combine_latest([](bool e, bool w)
{
return !e && !w;
}, working)));
// use enabled and !working to control canExecute
disable = std::make_shared < rxrt::ReactiveCommand < RoutedEventPattern> >(observable(from(enabled)
.combine_latest([](bool e, bool w)
{
return e && !w;
}, working)));

Create state sequences that are used by these ReactiveCommand (Scenario1.Xaml.cpp):
// when enable or disable is executing mark as working (both commands should be disabled)
observable(from(enable->IsExecuting())
.combine_latest([](bool ew, bool dw)
{
return ew || dw;
}, disable->IsExecuting()))
->Subscribe(observer(working));
// when enable is executed mark the scenario enabled, when disable is executed mark the scenario disabled
observable(from(observable(enable))
.select([this](RoutedEventPattern)
{
return accelerometer != nullptr;
})
.merge(observable(from(observable(disable))
.select([](RoutedEventPattern)
{
return false;
}))))
->Subscribe(observer(enabled)); // this is a subscription to the enable and disable ReactiveCommands
typedef TypedEventHandler<Accelerometer^, AccelerometerReadingChangedEventArgs^> AccelerometerReadingChangedTypedEventHandler;
auto readingChanged = from(rxrt::FromEventPattern<AccelerometerReadingChangedTypedEventHandler>(
[this](AccelerometerReadingChangedTypedEventHandler^ h)
{
return this->accelerometer->ReadingChanged += h;
},
[this](Windows::Foundation::EventRegistrationToken t)
{
this->accelerometer->ReadingChanged -= t;
}))
.select([](rxrt::EventPattern<Accelerometer^, AccelerometerReadingChangedEventArgs^> e)
{
// on sensor thread
return e.EventArgs()->Reading;
})
// push readings to ui thread
.observe_on_dispatcher()
.publish()
.ref_count();
auto currentWindow = Window::Current;
auto visiblityChanged = from(rxrt::FromEventPattern<WindowVisibilityChangedEventHandler, VisibilityChangedEventArgs>(
[currentWindow](WindowVisibilityChangedEventHandler^ h)
{
return currentWindow->VisibilityChanged += h;
},
[currentWindow](Windows::Foundation::EventRegistrationToken t)
{
currentWindow->VisibilityChanged -= t;
}))
.select([](rxrt::EventPattern<Platform::Object^, VisibilityChangedEventArgs^> e)
{
return e.EventArgs()->Visible;
})
.publish()
.ref_count();
auto visible = from(visiblityChanged)
.where([](bool v)
{
return !!v;
}).merge(
from(observable(navigated))
.where([](bool n)
{
return n;
}));
auto invisible = from(visiblityChanged)
.where([](bool v)
{
return !v;
});
// the scenario ends when:
auto endScenario =
// - disable is executed
from(observable(disable))
.select([](RoutedEventPattern)
{
return true;
}).merge(
// - the scenario is navigated from
from(observable(navigated))
.where([](bool n)
{
return !n;
}));

Thursday, September 5, 2013

DeferOperation

I have been working on RxC++ for a few months now. I have added Schedulers and lots of operators. Now I am trying to apply Rx to a C++ Windows Store app.

I was told that Deferrals did not compose well with Rx. This motivated me to design the DeferOperation operator.

DeferOperation demonstrates how Rx can be extended by a few engineers to provide services for many. Another way to say this is that Rx allows Async Algorithms to be written once, carefully, so that many others can re-use them.


These are all references to the Windows 8.1 Accelerometer sample I am modifying.

Usage (App.Xaml.Cpp):
auto ct = std::make_shared<rx::CurrentThreadScheduler>();
typedef rxrt::EventPattern<Platform::Object^, SuspendingEventArgs^> SuspendingEventPattern;
rx::from(suspending)
.chain<rxrt::defer_operation>(
[](SuspendingEventPattern ep)
{
// defer this operation
return ep.EventArgs()->SuspendingOperation;
},
[](rxrt::OperationPattern<SuspendingOperation^>, SuspendingEventPattern)
{
// do this while the operation is deferred
return SuspensionManager::ReactiveSave();
},
ct)
.publish()
.connect_forever();

DeferOperation defaults to the ui thread by reading the context off of the Window, but when the suspending event is fired the Window is not valid, so this uses the current thread scheduler explicitly. publish and connect_forever ensure that the work is done even if nothing subscribes the result.


The implementation follows.

The core of the DeferOperation operator is (cpprx/rx-winrt.hpp):
// must take the deferral early while the event is still on the stack.
auto op = make_operation_pattern(sop(t));
typedef decltype(op) OP;
return Using(
// resource factory
[=]()
{
return op;
},
// observable factory
[sob, t](OP op)
{
return sob(op, t);
});

sop is the first lambda from the usage (cpprx/rx-winrt.hpp):
[](SuspendingEventPattern ep)
{
// defer this operation
return ep.EventArgs()->SuspendingOperation;
}
view raw sop hosted with ❤ by GitHub

sob is the second lambda from the usage (the OperationPattern is passed so that the SuspendingOperation and the Deferral are available to the sequence if it wants to see the Deadline or Complete early, for example) (cpprx/rx-winrt.hpp)
[](rxrt::OperationPattern<SuspendingOperation^>, SuspendingEventPattern)
{
// do this while the operation is deferred
return SuspensionManager::ReactiveSave();
}
view raw sob hosted with ❤ by GitHub

Using will retrieve the resource (OperationPattern), subscribe to the result of sob and Dispose the resource when the subscription completes or errors.

OperationPattern provides the abstraction needed by Using. It will call get and complete the deferral (cpprx/rx-winrt.hpp):
template <class O>
struct OperationPattern
{
typedef decltype(((O)nullptr)->GetDeferral()) D;
OperationPattern(O operation) :
operation(operation),
deferral(operation->GetDeferral())
{
}
O Operation() const {
return operation;
};
D Deferral() const {
return deferral;
};
void Dispose()
{
deferral->Complete();
}
operator Disposable() const
{
// make sure to capture state and not 'this'.
// usage means that 'this' will usualy be destructed
// immediately
auto local = deferral;
return Disposable([local]{
local->Complete();
});
}
private:
O operation;
D deferral;
};
template <class O>
OperationPattern<O> make_operation_pattern(O o)
{
return OperationPattern<O>(std::move(o));
}
 

Friday, August 30, 2013

Reactive Extensions for C++

I get excited when I can compose asynchronous code without using thread or lock API's

A C++/CX Windows 8.1 sample app that demonstrates using the accelerometer.

Now see it transformed using Rx++
https://github.com/kirkshoop/rxaccelerometer

Examples 

Create a ReactiveCommand and bind it to a Xaml button
typedef rxrt::EventPattern<Object^, Windows::UI::Xaml::RoutedEventArgs^> RoutedEventPattern;
// start out disabled
enabled = std::make_shared<rx::BehaviorSubject<bool>>(false);
// use enabled to control canExecute
disable = std::make_shared < rxrt::ReactiveCommand < RoutedEventPattern> >(observable(enabled));
from(observable(disable))
// stay on the ui thread
.subscribe([this](RoutedEventPattern)
// on click
{
...
// now the scenario is disabled
this->enabled->OnNext(false);
});
rxrt::BindCommand(ScenarioDisableButton, disable);
view raw gistfile1.cpp hosted with ❤ by GitHub


Transform SaveAsync method to ReactiveSave
std::shared_ptr<rx::Observable<bool>> SuspensionManager::ReactiveSave(void)
{
...
// Serialize the session state synchronously to avoid asynchronous access to shared
// state
auto sessionData = ref new InMemoryRandomAccessStream();
auto sessionDataWriter = ref new DataWriter(sessionData->GetOutputStreamAt(0));
WriteObject(sessionDataWriter, _sessionState);
// one-time construction of reactive functions needed to save.
auto reactiveStore = rxrt::FromAsyncPattern<DataWriter^>(
[](DataWriter^ dw){
return dw->StoreAsync(); });
auto reactiveFlush = rxrt::FromAsyncPattern<DataWriter^>(
[](DataWriter^ dw){
return dw->FlushAsync(); });
auto reactiveCreateFile = rxrt::FromAsyncPattern<StorageFolder^, String^, CreationCollisionOption >(
[](StorageFolder^ folder, String^ name, CreationCollisionOption option){
return folder->CreateFileAsync(name, option); });
auto reactiveOpenFile = rxrt::FromAsyncPattern<StorageFile^, FileAccessMode >(
[](StorageFile^ f, FileAccessMode mode){
return f->OpenAsync(mode); });
auto reactiveCopy = rxrt::FromAsyncPattern<IInputStream^, IOutputStream^ >(
[](IInputStream^ in, IOutputStream^ out){
return RandomAccessStream::CopyAndCloseAsync(in, out); });
// Begin the asynchronous process
// of writing the result to disk
return observable(from(reactiveStore(sessionDataWriter))
.select_many([=](unsigned int){
return reactiveFlush(sessionDataWriter); })
.select_many([=](Boolean){
return reactiveCreateFile(
ApplicationData::Current->LocalFolder,
sessionStateFilename,
CreationCollisionOption::ReplaceExisting); })
.select_many([=](StorageFile^ file){
return reactiveOpenFile(file, FileAccessMode::ReadWrite); })
.select_many([=](IRandomAccessStream^ stream){
return reactiveCopy(
sessionData->GetInputStreamAt(0),
stream->GetOutputStreamAt(0)); })
.select([](UINT64){ // convert to bool observable
return true; }) // success! call onnext with true
.publish(false) // only save once even if the caller subscribes
// more than once. initially onnext will be called with false
.connect_forever()); // save now, even if the caller does not subscribe
}
view raw reactive save hosted with ❤ by GitHub

Introduction to Rx
Reactive Extensions Rx were born as a transformation of LINQ expressions which pull from a data source through a set of operators into an Rx expression where the data source pushes through a set of operators.

Personally, I recognized them as the generalization I wanted to make of a pattern I had popularized in a previous project. We called them Sender/Receiver, Rx calls them Observable/Observer

Observable and Observer
An Observable is a source of data. An Observer operates on data as the source provides it. An Observer connects to an Observable by Subscribing to it. Many Observers can connect to the same Observable. 

A Subject is both an Observable and an Observer and will pass the data sent to the Observable on to each of the Observers that are Subscribed to the Observable.

An Operator is a function that takes at least one Observable argument and returns a new Observable.

Web Resources
There is a lot of information about Rx. Most of the information is focused on the .Net version as that came first. More recently there have been other versions built.

A partial list of Rx implementations:
Aaron Lahmann, a friend (and one-time co-worker in the team where I was working on Sender/Receiver) wrote Rx++ and was talking to me about it at the time. I still did not know enough about it to jump on it at the time.


Eventually, I saw an article by Jafar Husain that linked to an RxJS tutorial that got me excited. I highly recommend taking the time to follow through the examples.