Let's start from the beginning:
I'm writing an algorithm for a Silverlight application, which has to do go through a lot different combinations of a high complexity to find an optimum. To give the algorithm the possibility to use all given resources on the client, I decided to do provide a parallel version.
Firstly I wrote my own async event-orientated scheduler class with a wait handle and a blocking object to limit the amount of parallel threads and to wait for all threads at the end, until I fired the final CalculationCompletedEvent (by the way: I was using Backgroundworkers to perform the multithreading). But something there wasn't thread safe, and the amount of returning elements in the result list wasn't constant. I considered not to spend more time in searching for the leak after a colleague pointed me on the Reactive Extensions (rx).
To get an idea of how to use this, I combined an consumer-producer example with some advices on how to use rx (example1 and example2).
This works great, but what I don't understand is: Why do I have to resize the browser to have the listbox updated and show the containing elements of "_receivedStrings"? Once again a tiny stupid neglect?
By the way: If you wouldn't recommend using the rx, give a shot and tell me why and what to use else.
XAML:
<UserControl x:Class="ReactiveTest.MainPage" xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:d="http://schemas.microsoft.com/expression/blend/2008" xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" mc:Ignorable="d" d:DesignHeight="300" d:DesignWidth="400"> <Grid x:Name="LayoutRoot" Background="White"> <Grid.RowDefinitions> <RowDefinition/> <RowDefinition Height="Auto"/> </Grid.RowDefinitions> <ListBox HorizontalAlignment="Stretch" Name="listBox1" VerticalAlignment="Stretch" ItemsSource="{Binding}"/> <Button Grid.Row="1" Content="Klick me!" Width="Auto" Height="Auto" HorizontalAlignment="Center" Click="Button_Click"/> </Grid> </UserControl>
Codebehind:
using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Windows; using System.Windows.Controls; using System.Windows.Documents; using System.Windows.Input; using System.Windows.Media; using System.Windows.Media.Animation; using System.Windows.Shapes; using System.IO; using System.Reactive.Linq; namespace ReactiveTest { public partial class MainPage : UserControl { private int _parallelThreadsAmount; public IList<String> receivedStrings; public MainPage() { InitializeComponent(); receivedStrings = new List<String>(); this._parallelThreadsAmount = 10; this.listBox1.DataContext = receivedStrings; } private void Button_Click(object sender, RoutedEventArgs e) { IList<IObservable<String>> obsCollection = new List<IObservable<String>>(); foreach (var item in forums) { obsCollection.Add(Calculate(item)); } DateTime start = DateTime.Now; obsCollection.Merge(this._parallelThreadsAmount) .Subscribe( y => { receivedStrings.Add( String.Format("{0} - Received: {1}", receivedStrings.Count, y)); }, () => { DateTime end = DateTime.Now; TimeSpan elapsed = end - start; this.receivedStrings.Add( String.Format( "{0}/{1} done in {2} ms.", receivedStrings.Count, forums.Count(), elapsed.TotalSeconds) ); } ); } IObservable<String> Calculate(String source) { Random rand = new Random(); return Observable.Defer(() => Observable.Start(() => { // simulate some work, taking different time, // to get the threads end in an other order than they've been started System.Threading.Thread.Sleep(rand.Next(500, 2000)); return source; })); } static readonly String[] forums = new string[] { "announce", "whatforum", "reportabug", "suggest", "Offtopic", "msdnsandbox", "netfxsetup", "netfxbcl", "wpf", "regexp", "msbuild", "netfxjscript", "clr", "netfxtoolsdev", "asmxandxml", "netfx64bit", "netfxremoting", "netfxnetcom", "MEFramework", "ncl", "wcf", "Geneva", "MSWinWebChart", "dublin", "oslo", // … some more elements }; } }
Answer: 1
Ignoreing the lack of MVVM, IoC, Testability etc... You dont implement INotifyPropertyChanged, you are not using ObservableCollection(of T). 1. Change your public field to a pubilc readonly property 2. Use ObservableCollection instead of IList
//public IList<String> receivedStrings; private readonly ObservableCollection<string> _receivedStrings = new ObservableCollection<string>(); public ObservableCollection<string> ReceivedStrings { get { return _receivedStrings;} }
you may also have to use the ObserveOnDispatcher() to ensure that you are called back on the Dispatcher, as you cant update the UI (even via Binding) on a thread that is not the Dispatcher's thread.
obsCollection.Merge(this._parallelThreadsAmount) .ObserveOn(Scheduler.Dispatcher) //-or-.ObserveOnDispatcher() //-or even better -.ObserveOn(_schedulerProvider.Dispatcher) .Subscribe(
by : Lee Campbellhttp://stackoverflow.com/users/393615Answer: 2
Somehow I'm tempted to say that you are a bit confused with Rx, which is quite normal to be honest :)
As Lee Campbell writes there is a few things for starters, the resizing issue is not related to Rx at all as I see it but it's the ObservableCollection stuff.
Besides that I have modified your code a bit to handle some of the things I see, I'm not sure if it is exactly what you want since you have a specific assignment of how parallel you want stuff, but I'm gonna be bold and say it's not your concern (and hopefully I'm not being a smartass here).
private void Button_Click(object sender, RoutedEventArgs e) { //extension method in Rx for wrapping var obsCollection = forums.ToObservable(); //Observing on the dispatcher to prevent x-thread exceptions obsCollection.Select( Calculate ).ObserverOnDispatcher().Subscribe( receivedString => { receivedStrings.Add( String.Format("{0} - Received: {1}", receivedStrings.Count, y) ); }, ()=>{ DateTime end = DateTime.Now; TimeSpan elapsed = end - start; this.receivedStrings.Add( String.Format( "{0}/{1} done in {2} ms.", receivedStrings.Count, forums.Count(), elapsed.TotalSeconds) ); } ); } Random rand = new Random(); IObservable<string> Calculate(string inputString) { //launching the "calculation" on the taskpool...can be changed to other schedulers return Observable.Start( ()=>{ Thread.Sleep(rand.Next(150,250)); return inputString; }, Scheduler.ThreadPool ); }
by : cyberzedhttp://stackoverflow.com/users/94302
No comments:
Post a Comment
Send us your comment related to the topic mentioned on the blog