I get excited when I can compose asynchronous code without using thread or lock API's
A C++/CX Windows 8.1 sample app that demonstrates using the accelerometer.
Now see it transformed using Rx++
https://github.com/kirkshoop/rxaccelerometer
Examples
Create a ReactiveCommand and bind it to a Xaml button
Examples
Create a ReactiveCommand and bind it to a Xaml button
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
typedef rxrt::EventPattern<Object^, Windows::UI::Xaml::RoutedEventArgs^> RoutedEventPattern; | |
// start out disabled | |
enabled = std::make_shared<rx::BehaviorSubject<bool>>(false); | |
// use enabled to control canExecute | |
disable = std::make_shared < rxrt::ReactiveCommand < RoutedEventPattern> >(observable(enabled)); | |
from(observable(disable)) | |
// stay on the ui thread | |
.subscribe([this](RoutedEventPattern) | |
// on click | |
{ | |
... | |
// now the scenario is disabled | |
this->enabled->OnNext(false); | |
}); | |
rxrt::BindCommand(ScenarioDisableButton, disable); |
Transform SaveAsync method to ReactiveSave
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
std::shared_ptr<rx::Observable<bool>> SuspensionManager::ReactiveSave(void) | |
{ | |
... | |
// Serialize the session state synchronously to avoid asynchronous access to shared | |
// state | |
auto sessionData = ref new InMemoryRandomAccessStream(); | |
auto sessionDataWriter = ref new DataWriter(sessionData->GetOutputStreamAt(0)); | |
WriteObject(sessionDataWriter, _sessionState); | |
// one-time construction of reactive functions needed to save. | |
auto reactiveStore = rxrt::FromAsyncPattern<DataWriter^>( | |
[](DataWriter^ dw){ | |
return dw->StoreAsync(); }); | |
auto reactiveFlush = rxrt::FromAsyncPattern<DataWriter^>( | |
[](DataWriter^ dw){ | |
return dw->FlushAsync(); }); | |
auto reactiveCreateFile = rxrt::FromAsyncPattern<StorageFolder^, String^, CreationCollisionOption >( | |
[](StorageFolder^ folder, String^ name, CreationCollisionOption option){ | |
return folder->CreateFileAsync(name, option); }); | |
auto reactiveOpenFile = rxrt::FromAsyncPattern<StorageFile^, FileAccessMode >( | |
[](StorageFile^ f, FileAccessMode mode){ | |
return f->OpenAsync(mode); }); | |
auto reactiveCopy = rxrt::FromAsyncPattern<IInputStream^, IOutputStream^ >( | |
[](IInputStream^ in, IOutputStream^ out){ | |
return RandomAccessStream::CopyAndCloseAsync(in, out); }); | |
// Begin the asynchronous process | |
// of writing the result to disk | |
return observable(from(reactiveStore(sessionDataWriter)) | |
.select_many([=](unsigned int){ | |
return reactiveFlush(sessionDataWriter); }) | |
.select_many([=](Boolean){ | |
return reactiveCreateFile( | |
ApplicationData::Current->LocalFolder, | |
sessionStateFilename, | |
CreationCollisionOption::ReplaceExisting); }) | |
.select_many([=](StorageFile^ file){ | |
return reactiveOpenFile(file, FileAccessMode::ReadWrite); }) | |
.select_many([=](IRandomAccessStream^ stream){ | |
return reactiveCopy( | |
sessionData->GetInputStreamAt(0), | |
stream->GetOutputStreamAt(0)); }) | |
.select([](UINT64){ // convert to bool observable | |
return true; }) // success! call onnext with true | |
.publish(false) // only save once even if the caller subscribes | |
// more than once. initially onnext will be called with false | |
.connect_forever()); // save now, even if the caller does not subscribe | |
} |
Introduction to Rx
Reactive Extensions Rx were born as a transformation of LINQ expressions which pull from a data source through a set of operators into an Rx expression where the data source pushes through a set of operators.
Personally, I recognized them as the generalization I wanted to make of a pattern I had popularized in a previous project. We called them Sender/Receiver, Rx calls them Observable/Observer
Observable and Observer
An Observable is a source of data. An Observer operates on data as the source provides it. An Observer connects to an Observable by Subscribing to it. Many Observers can connect to the same Observable.
A Subject is both an Observable and an Observer and will pass the data sent to the Observable on to each of the Observers that are Subscribed to the Observable.
An Operator is a function that takes at least one Observable argument and returns a new Observable.
Web Resources
There is a lot of information about Rx. Most of the information is focused on the .Net version as that came first. More recently there have been other versions built.
A partial list of Rx implementations:
- Rx.NET, RxJS, Rx++ - https://rx.codeplex.com/
- RxJava - https://github.com/Netflix/RxJava
- ReactiveCocoa - https://github.com/ReactiveCocoa/ReactiveCocoa
- Rx.rb - https://github.com/Reactive-Extensions/Rx.rb
- Rx.py - https://rxpy.codeplex.com/
Eventually, I saw an article by Jafar Husain that linked to an RxJS tutorial that got me excited. I highly recommend taking the time to follow through the examples.
No comments:
Post a Comment