Concurrency în RxJava 2

O aplicație multithreaded are două sau mai multe părți care pot rula în paralel. Acest lucru permite aplicației să utilizeze mai bine nucleul din interiorul dispozitivului CPU. Acest lucru permite ca sarcinile să se facă mai repede și conduc la o experiență mai lină și mai receptivă pentru utilizator. 

Codarea pentru concurrency în Java poate fi dureroasă, dar datorită RxJava, acum este mult mai ușor de făcut. Cu RxJava, trebuie doar să declarați firul pe care doriți ca sarcina să fie executată (declarativ) în loc să creați și să gestionați firele (imperativ). 

RxJava face uz de Programatoare impreuna cu subscribeOn () și observeOn () operatorii de concurență pentru a realiza acest lucru. În acest tutorial, veți afla despre Programatoare,  subscribeOn () operatorul, observeOn () operatorului și, de asemenea, modul de utilizare a acestuia flatMap () operator pentru a realiza concurrency. Dar, mai întâi, să începem Programatoare în RxJava.

Cerințe preliminare 

Pentru a urmări împreună cu acest tutorial, ar trebui să fiți familiarizați cu:

  • RxJava 2 pe Android
  • expresii lambda

Verificați celelalte postări pentru a obține viteza pe elementele de bază ale expresiilor RxJava și lambda.

Programatorii în RxJava 2

Programatoare în RxJava sunt folosite pentru a executa o unitate de lucru pe un fir. A Scheduler oferă o abstracție la mecanismul de filetare Android și Java. Când doriți să executați o sarcină și faceți uz de a Scheduler să execute acea sarcină, Scheduler merge la bazinul de fire (o colecție de fire care sunt gata de utilizare) și apoi execută sarcina într-un thread disponibil. 

De asemenea, puteți specifica că o sarcină ar trebui să ruleze într-un fir specific. (Există doi operatori, subscribeOn () și observeOn (), care pot fi folosite pentru a specifica pe care fir din Scheduler thread piscină sarcina ar trebui să fie executat.)

După cum știți, în Android, procesele de lungă durată sau sarcinile intensive CPU nu ar trebui să fie difuzate pe firul principal. Dacă un abonament de către un utilizator Observator la un Observabil este realizat pe firul principal, orice operator asociat va rula și pe firul principal. În cazul unei sarcini de lungă durată (de exemplu, efectuarea unei solicitări de rețea) sau a unei sarcini intensive a procesorului (de exemplu, transformarea imaginii), acest lucru va bloca interfața de utilizator până la finalizarea sarcinii, ducând la dialogul îngrozitor ANR (Application Not Responding) și crashing app. Acești operatori pot fi trecuți la un alt fir cu observeOn () operator. 

În secțiunea următoare, vom explora diferitele tipuri Programatoare și utilizarea lor.

Tipuri de programatori

Iată câteva dintre tipurile de Programatoare disponibil in RxJava și RxAndroid pentru a indica tipul de fir pentru executarea sarcinilor. 

  • Schedulers.immediate (): returnează a Scheduler care execută lucrarea instantaneu în firul curent. Fiți conștienți de faptul că acest lucru va bloca firul curent, deci ar trebui folosit cu prudență. 
  • Schedulers.trampoline (): planifică sarcini în firul curent. Aceste sarcini nu sunt executate imediat, ci sunt executate după ce thread-ul și-a terminat sarcinile curente. Acest lucru este diferit de Schedulers.immediate () deoarece în loc să execute imediat o sarcină, așteaptă ca sarcinile curente să se finalizeze. 
  • Schedulers.newThread (): se declanșează un fir nou și se întoarce o Scheduler pentru a executa sarcina în noul thread pentru fiecare Observator. Ar trebui să fiți atenți folosind acest lucru deoarece firul nou nu este refolosit ulterior, ci este distrus. 
  • Schedulers.computation (): acest lucru ne dă Scheduler care este destinat unei lucrări computaționale intensive, cum ar fi transformarea imaginii, calcule complexe etc. Această operație utilizează pe deplin nucleele procesorului. Acest Scheduler folosește o dimensiune a filei fixe a filetului care depinde de miezurile procesorului pentru o utilizare optimă. Ar trebui să aveți grijă să nu creați mai multe fire decât nucleele CPU disponibile, deoarece acest lucru poate reduce performanța. 
  • Schedulers.io (): creează și returnează a Scheduler desemnate pentru lucrări legate de I / O, cum ar fi efectuarea de apeluri de rețea asincrone sau citirea și scrierea în baza de date. Aceste sarcini nu sunt CPU-intensive sau altceva să facă uz de Schedulers.computation ().
  • Schedulers.single (): creează și returnează a Scheduler și execută mai multe sarcini succesiv într-un singur fir. 
  • Schedulers.from (Executor executor): aceasta va crea o Scheduler care va executa o sarcină sau o unitate de lucru pe dată Executor testamentar
  • AndroidSchedulers.mainThread (): aceasta va crea o Scheduler care execută sarcina pe firul principal al aplicației Android. Acest tip de programator este furnizat de către RxAndroid bibliotecă. 

 subscribeOn () Operator

Prin utilizarea funcției subscribeOn () operatorul de concurență, specificați că Scheduler ar trebui să efectueze operațiunea în Observabil în amonte. Apoi va împinge valorile la observatorii utilizând același fir. Acum, să vedem un exemplu practic:

import android.os.Bundle; import șiroid.support.v7.app.AppCompatActivity; import șiroid.util.Log; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; clasa publica MainActivity extinde AppCompatActivity private static final String [] STATES = "Lagos", "Abuja", "Abia", "Edo", "Enugu", "Niger", "Anambra"; private Disposable mDisposable = null; @Override protejate void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observabil observable = Obsable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .doOnComplete (() -> Log.d ("MainActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("MainActivity", "primită" + s + "pe fir" + Thread.currentThread () getName ());  privată ObservationOnSubscribe dataSource () retur (emițător -> pentru (String stări: STATES) emitter.onNext (stat); Log.d ("MainActivity", "emitting" + state + "thread" + Thread.currentThread (). getName ()); Thread.sleep (600); emitter.onComplete (););  @Override protejate void onDestroy () if (mDisposable! = Null &&! MDisposable.isDisposed ()) mDisposable.dispose ();  super.onDestroy (); 

În codul de mai sus, avem un static ArrayList care conține unele state din Nigeria. Avem, de asemenea, un câmp care este de tip disponibil. Noi primim disponibil exemplu prin apel Observable.subscribe (), și o vom folosi ulterior când sunăm dispune() metoda de a elibera orice resurse care au fost utilizate. Acest lucru ajută la prevenirea scurgerilor de memorie. Al nostru sursă de date() (care poate returna date dintr-o sursă de baze de date la distanță sau locală) va reveni ObservableOnSubscribe: acest lucru este necesar pentru a ne crea propriile noastre Observabil ulterior folosind metoda Observable.create ()

În interiorul sursă de date() metode, ne bucle prin matrice, emitând fiecare element la observatorii sunând emitter.onNext (). După ce fiecare valoare este emisă, dormim firul pentru a simula munca intensă. În cele din urmă, numim onComplete () metodă de a semnala la observatorii că am depășit valorile trecute și că nu mai trebuie să ne așteptăm. 

Acum, ale noastre sursă de date() metoda nu ar trebui executată pe firul principal al interfeței utilizator. Dar cum este specificat acest lucru? În exemplul de mai sus, am furnizat Schedulers.newThread () ca un argument pentru subscribeOn (). Aceasta înseamnă că sursă de date() funcționarea va fi rulată într-un fir nou. Rețineți, de asemenea, că în exemplul de mai sus, avem doar unul Observator. Dacă am avea mai multe observatorii, fiecare dintre ei ar avea propriul fir. 

Ca să vedem cum funcționează asta, a noastră Observator imprimă valorile pe care le primește onNext () metoda de la Observabil

Când rulați acest lucru și vedeți logat-ul nostru pe Android Studio, puteți vedea că emisiile de la sursă de date() metoda pentru a Observator sa întâmplat în același fir-RxNewThreadScheduler-1-în care Observator le-a primit. 

Dacă nu specificați .subscribeOn () după metoda Observable.create () , aceasta va fi executată pe firul curent - care, în cazul nostru, este firul principal, blocând astfel interfața de utilizare a aplicației. 

Există câteva detalii importante pe care ar trebui să le cunoașteți în legătură cu subscribeOn () operator. Ar trebui să aveți doar una subscribeOn () în Observabil lanţ; adăugând altul oriunde în lanț nu va avea deloc efect. Locul recomandat pentru a pune acest operator este cât mai aproape de sursă posibil din motive de claritate. Cu alte cuvinte, plasați-o mai întâi în lanțul operatorului. 

Observable.create (dataSource ()) .subscribeOn (Schedulers.computation ()) // aceasta are efect .subscribeOn (Schedulers.io ()) // nu are efect .doOnNext (s -> saveToCache (s); executat pe Schedulers.computation ())

 observeOn () Operator

După cum am văzut, subscribeOn () operatorul de concurență va instrui Observabil care Scheduler să folosească pentru a împinge emisiile înainte de-a lungul Observabil lanț pentru a observatorii

Locul de muncă al observeOn () operatorul de concurență, pe de altă parte, este de a schimba emisiile ulterioare pe alt fir sau Scheduler. Folosim acest operator pentru a controla ce consumatori din aval vor primi emisiile. Să vedem un exemplu practic. 

import android.os.Bundle; import șiroid.support.v7.app.AppCompatActivity; import șiroid.util.Log; import șiroid.widget.TextView; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; clasa publică ObserveOnActivity se extinde la AppCompatActivity privat de unică folosință mDisposable = null; @Override protejate void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); TextView textView = (TextView) findViewById (R.id.tv_main); Observabil observabile = Observable.create (DATASOURCE ()) .subscribeOn (Schedulers.newThread ()) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d ( "ObserveOnActivity", "Complete")); mDisposable = observable.subscribe (s -> . Log.d ( "ObserveOnActivity", "a primit" + s + "pe fir" + Thread.currentThread () getName ()); textView.setText (s););  privată ObservationOnSubscribe DATASOURCE () return (emițător -> Thread.sleep (800). emitter.onNext ( "Value"); Log.d ( "ObserveOnActivity", "DATASOURCE () pe fir" + Thread.currentThread () getName ( )); emitter.onComplete (););  // ... 

În codul de mai sus, am folosit observeOn () operator și apoi a trecut AndroidSchedulers.mainThread () la el. Ceea ce am făcut este să oprim firul Schedulers.newThread () la firul principal Android. Acest lucru este necesar pentru că vrem să actualizăm TextView widget și poate face acest lucru numai din firul principal de interfață. Rețineți că dacă nu treceți la firul principal atunci când încercați să actualizați TextView widget, aplicația se va prăbuși și ar arunca CalledFromWrongThreadException

spre deosebire de subscribeOn () operatorul, observeOn () operatorul poate fi aplicat de mai multe ori în lanțul operatorului, modificând astfel Scheduler mai mult de o dată. 

Observabil observable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (Schedulers.io ()) .doOnNext (s -> saveToCache; Log.d ("ObserveOnActivity", "doOnNext () pe fișierul "+ Thread.currentThread (). getName ());)) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d (" ObserveOnActivity ";

Acest cod are două observeOn () operatori. Primul utilizează Schedulers.io (), ceea ce înseamnă că saveToCache () metoda va fi executată pe Schedulers.io () fir. După aceasta, se trece apoi la AndroidSchedulers.mainThread () Unde observatorii vor primi emisiile din amonte. 

Concurrency Cu operatorul flatMap ()

flatMap () operator este un alt operator foarte puternic și important care poate fi folosit pentru a atinge concurrency. Definiția conform documentației oficiale este următoarea:

Transformați elementele emise de un observator în imagini observabile, apoi aplatizați emisiile de la acestea într-o singură observabilă.


Să aruncăm o privire la un exemplu practic care folosește acest operator: 

 // ... @Override protejat void onCreate (Bundle savedInstanceState) // ... final String [] state = "Lagos", "Abuja", "Imo", "Enugu"; Observabil statesObservable = Observabil.fromArray (state); stateObservable.flatMap (s -> observable.create (getPopulation (s)) .subscribe (pereche -> Log.d ("MainActivity", pair.first + "populația este" + pair.second));  privată ObservationOnSubscribe getPopulation (String state) retur (emițător -> Random r = nou Random (); Log.d ("MainActivity" )); emitter.onNext (noua pereche (state, r.nextInt (300000 - 10000) + 10000)); emitter.onComplete ();); 

Acest lucru va imprima următoarele în logcatul Android Studio:

getPopulation () pentru Lagos numit pe populația principală Lagos este 80362 getPopulation () pentru Abuja numit pe principala populație Abuja este 132559 getPopulation () pentru Imo numit pe populația principală Imo este 34106 getPopulation () pentru Enugu numit pe populația principală Enugu este 220301

Din rezultatul de mai sus, puteți vedea că rezultatele obținute au fost în aceeași ordine ca și în matrice. De asemenea getPopulation () pentru fiecare stare a fost procesată pe același fir - firul principal. Aceasta face ca rezultatul să fie lent, deoarece a fost procesat secvențial pe firul principal. 

Acum, pentru ca noi să realizăm concurrency cu acest operator, vrem getPopulation () pentru fiecare stat (emisii provenite de la statesObservable) pentru a fi procesate pe fire diferite. Acest lucru va duce la o procesare mai rapidă. Vom folosi flatMap () operator pentru a face acest lucru, deoarece creează un nou Observabil pentru fiecare emisie. Aplicăm apoi subscribeOn () operatorul de concurență la fiecare, trecând a Scheduler la el. 

 stateObservable.flatMap (s -> observable.create (getPopulation (s)) .subscribeOn (Schedulers.io ())) .subscribe (pereche -> Log.d ("MainActivity", pair.first + .al doilea));

Deoarece fiecare emisie produce un Observabil, flatMap () sarcina operatorului este de a le îmbina împreună și apoi a le trimite ca un singur flux. 

getPopulation () pentru Lagos numit pe RxCachedThreadScheduler-1 populația Lagos este 143965 getPopulation () pentru Abuja numit pe RxCachedThreadScheduler-2 getPopulation () pentru Enugu numit pe RxCachedThreadScheduler-4 populația Abuja este 158363 populația Enugu este 271420 getPopulation () pentru Imo numit pe RxCachedThreadScheduler -3 Imo populație este 81564

În rezultatul de mai sus, putem observa că fiecare stat este getPopulation () metoda a fost procesată pe fire diferite. Acest lucru face ca procesarea să fie mult mai rapidă, dar, de asemenea, să se observe că emisiile din flatMap () operator care a fost primit de către Observator nu sunt în aceeași ordine ca emisiile inițiale în amonte. 

Concluzie

În acest tutorial, ați aflat despre manipularea concurenței folosind RxJava 2: ce este, diferit Programatoare disponibile și cum se utilizează subscribeOn () și observeOn () operatori de concurență. De asemenea, v-am arătat cum să utilizați flatMap () operator pentru a realiza concurrency. 

Între timp, verificați câteva dintre celelalte cursuri și tutoriale despre limbajele Java și dezvoltarea aplicațiilor Android!

Cod