Operatori de programare reactivă în RxJava 2

Dacă aplicația dvs. Android are în vedere raportarea acestor recenzii de cinci stele pe Google Play, atunci trebuie să aibă posibilitatea de a efectua mai multe activități.

Ca un minim minim, utilizatorii de telefonie din ziua de astăzi se așteaptă să poată interacționa cu aplicația în timp ce lucrează în fundal. Acest lucru ar putea părea simplu, dar Android are un singur fir în mod implicit, deci dacă întâlniți așteptările publicului, atunci mai devreme sau mai târziu va trebui să creați câteva fire suplimentare.

În articolul precedent din această serie am primit o introducere în RxJava, o bibliotecă reactivă pentru JVM, care vă poate ajuta să creați aplicații Android care reacționează la date și evenimente în momentul în care apar. Dar puteți utiliza și această bibliotecă pentru a reacționa la date și evenimente simultan.

În acest post, vă voi arăta cum puteți utiliza operatorii RxJava pentru a face în cele din urmă concurența pe Android o experiență fără durere. Până la sfârșitul acestui articol, veți ști cum să utilizați operatorii RxJava pentru a crea fire suplimentare, pentru a specifica munca care ar trebui să apară pe aceste fire și apoi pentru a posta rezultatele înapoi în thread-ul general UI principal al Android-toate cu doar o câteva linii de cod.

Și, din moment ce nici o tehnologie nu este perfectă, vă voi spune și despre o capcană potențială majoră de adăugare a bibliotecii RxJava la proiectele dvs. - înainte de a vă arăta cum să utilizați operatorii pentru a asigura această problemă nu apare în propriile dvs. proiecte Android.

Prezentarea operatorilor

RxJava are o colecție enormă de operatori care sunt destinate în principal să vă ajute să modificați, să filtrați, să îmbinați și să transformați datele emise de dvs. Observabils. Veți găsi lista completă a operatorilor RxJava la documentele oficiale și, deși nimeni nu se așteaptă să memorați fiecare operator unic, merită să petreceți ceva timp prin citirea acestei liste, doar pentru a avea o idee grosolană despre diferitele transformări de date pe care le puteți efectua.

Lista de operatori a lui RxJava este deja destul de exhaustivă, dar dacă nu puteți găsi operatorul perfect pentru transformarea datelor pe care ați avut-o în minte, puteți întotdeauna să lanțați împreună mai mulți operatori. Aplicarea unui operator la un Observabil de obicei returnează altul Observabil, astfel încât să puteți continua să aplicați operatori până când obțineți rezultatele pe care le doriți.

Există prea mulți operatori RxJava pentru a acoperi într-un singur articol și documentele oficiale RxJava fac deja o treabă bună de a introduce toți operatorii pe care îi puteți utiliza pentru transformările de date, așa că mă voi concentra pe doi operatori care au cel mai mare potențial pentru a vă face viața mai ușoară ca dezvoltator Android: subscribeOn () și observeOn ()

Multithreading cu operatorii RxJava

Dacă aplicația dvs. va oferi cea mai bună experiență posibilă pentru utilizatori, atunci trebuie să fie capabilă să efectueze activități intense sau de lungă durată și să efectueze simultan mai multe sarcini, fără a bloca firul principal de interfață UI.

De exemplu, imaginați-vă că aplicația dvs. trebuie să preia unele informații din două baze de date diferite. Dacă efectuați ambele sarcini una după alta pe firul principal al Androidului, atunci nu numai că aceasta va dura o perioadă semnificativă de timp, dar interfața utilizator nu va răspunde până când aplicația nu va finaliza preluarea fiecărei informații din ambele baze de date . Nu este o experienta extraordinara a utilizatorului!

O soluție mult mai bună este crearea a două fire suplimentare în care puteți efectua ambele sarcini simultan fără a le bloca firul principal de interfață. Această abordare înseamnă că munca va fi terminată de două ori mai rapidă, și utilizatorul va putea continua să interacționeze cu interfața de utilizare a aplicației dvs. pe tot parcursul anului. Este posibil ca utilizatorii dvs. să nu știe chiar că aplicația dvs. efectuează o activitate intensă și de lungă durată în fundal - toate informațiile bazei de date vor apărea pur și simplu în interfața aplicației dvs., ca și cum ar fi prin magie!

Din cutie, Android oferă câteva instrumente pe care le puteți utiliza pentru a crea fire suplimentare, inclusiv Serviciuși IntentServices, dar aceste soluții sunt greu de implementat și pot duce rapid la coduri complexe, verbose. În plus, dacă nu implementați corect multithreading, puteți găsi o aplicație care scade memoria și aruncă tot felul de erori.

Pentru a face multiplicarea pe Android chiar și mai multă durere de cap, firul principal al UI de la Android este singurul fir care poate actualiza interfața de utilizare a aplicației. Dacă doriți să actualizați interfața de utilizare a aplicației cu rezultatul muncii efectuate orice alt fir, atunci va trebui de obicei să creați o manipulant pe firul principal al interfeței utile și apoi folosiți acest lucru manipulant pentru a transfera date din firul de fundal în firul principal. Acest lucru înseamnă mai mult cod, mai multă complexitate și mai multe oportunități de erori pentru a intra în proiectul dvs..

Dar RxJava dispune de doi operatori care vă pot ajuta să evitați mult din această complexitate și potențial de eroare.

Rețineți că utilizați acești operatori împreună cu Programatoare, care sunt în esență componente care vă permit să specificați fire. Pentru moment, gândiți-vă Scheduler ca fiind sinonim cu cuvântul fir.

  • subscribeOn (Scheduler): În mod prestabilit, un Observabil emite datele sale pe firul unde a fost declarat abonamentul, adică unde ați sunat .Abonati-va metodă. În Android, acesta este, în general, principalul thread UI. Puteți utiliza funcția subscribeOn () operator pentru a defini un altul Scheduler unde Observabil ar trebui să execute și să emită datele sale.
  • observeOn (Scheduler): Puteți utiliza acest operator pentru a vă redirecționa Observabilemisiile la un altul Scheduler, schimbând în mod efectiv firul în care Observabilse trimite notificările și, prin extensie, firul unde se consumă datele sale.

RxJava vine cu un număr de planificatoare pe care le puteți folosi pentru a crea diferite fire, printre care:

  • Schedulers.io (): Proiectat pentru a fi utilizat pentru sarcini legate de IO. 
  • Schedulers.computation (): Proiectat pentru a fi utilizat pentru sarcini de calcul. În mod implicit, numărul de fire din planificatorul de calcul este limitat la numărul de procesoare disponibile pe dispozitiv.
  • Schedulers.newThread (): Creează un fir nou.

Acum, aveți o imagine de ansamblu asupra tuturor pieselor în mișcare, să aruncăm o privire la câteva exemple despre cum subscribeOn () și observeOn () sunt utilizate și vedeți unii planificatori în acțiune.

subscribeOn ()

În Android, veți folosi de obicei subscribeOn () și un însoțitor Scheduler pentru a schimba firul în care se efectuează o muncă lungă sau intensă, astfel încât nu există riscul blocării filetului principal al UI. De exemplu, puteți decide să importați o cantitate mare de date pe io () programator sau efectua unele calcule pe calcul() Scheduler.

În următorul cod, creăm un fir nou în care Observabil va executa operațiunile sale și va emite valorile 1, 2, și 3.

Observabile.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

În timp ce acest lucru este tot ce trebuie să creați un fir și să începeți să emiteți date despre acel fir, puteți dori o confirmare a faptului că acest observator funcționează într-adevăr pe un fir nou. O metodă este de a tipări numele firului pe care aplicația dvs. îl utilizează în mod curent, în Android StudioLogcat Monitor.

În mod convenabil, în postul anterior, începeți cu RxJava, am creat o aplicație care trimite mesaje către Logcat Monitor în diferite etape în timpul ciclului de viață Observable, astfel încât să putem reutiliza o mulțime de acest cod.

Deschideți proiectul pe care l-ați creat în postarea respectivă și optimizați codul, astfel încât să utilizați cele de mai sus Observabil ca sursă Observabil. Apoi adăugați subscribeOn () Operatorul și specificați că mesajele trimise la Logcat trebuie să includă numele firului curent.

Proiectul finalizat ar trebui să arate astfel:

import șiroid.support.v7.app.AppCompatActivity; import android.os.Bundle; import șiroid.util.Log; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; clasa publica MainActivity extinde AppCompatActivity public static final String TAG = "MainActivity"; @Override protejate void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observabile.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Observator Observer = Observator nou() @Override public void onSubscribe (Disponibil d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @Overide public void onNext (Valoare intreg) Log.e (TAG, "onNext:" + valoare + Thread.currentThread (). GetName ());  @Override publice void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!" + Thread.currentThread (). GetName ()); ; 

Asigurați-vă că monitorul Logcat Monitor al aplicației Android Studio este deschis (selectând Android Monitor tab, urmată de logcat) și apoi executați proiectul pe un dispozitiv fizic Android sau pe un AVD. Ar trebui să vedeți următoarea ieșire în monitorul Logcat:

Aici puteți vedea asta .Abonati-va este apelat pe firul principal al UI, dar observatorul funcționează pe un fir complet diferit.

subscribeOn () operatorul va avea același efect indiferent unde îl plasați în lanțul observabil; cu toate acestea, nu puteți utiliza mai multe subscribeOn () operatori din același lanț. Dacă includeți mai mult de unul subscribeOn (), atunci lanțul tău va numai folosește subscribeOn () aceasta este cea mai apropiată de sursa observabilă.

observeOn ()

Spre deosebire de subscribeOn (), unde locuiți observeOn () în lanțul tău face , deoarece acest operator modifică numai firul folosit de observabilele care apar în aval

De exemplu, dacă ați introdus următoarele în lanțul dvs., fiecare observabil care apare în lanț din acest punct va folosi noul fir.

.observeOn (Schedulers.newThread ())

Acest lanț va continua să ruleze pe firul nou până când se întâlnește cu altul observeOn () operator, moment în care va trece la firul specificat de operatorul respectiv. Puteți controla firul în care observatorii specifici trimit notificările prin introducerea mai multor observeOn () operatori în lanțul dvs..

La dezvoltarea aplicațiilor Android, veți utiliza în general observeOn () pentru a trimite rezultatul lucrărilor efectuate asupra firelor de fundal la firul principal de interfață cu Android. Cea mai ușoară cale de a redirecționa emisiile către firul principal de interfață UI de la Android este să utilizați Planificatorul AndroidSchedulers.mainThread, care este inclus ca parte a bibliotecii RxAndroid, mai degrabă decât biblioteca RxJava. 

Biblioteca RxAndroid include legături specifice pentru Android pentru RxJava 2, ceea ce îl face o resursă suplimentară valoroasă pentru dezvoltatorii Android (și ceva ce vom analiza mult mai detaliat în următorul post din această serie).

Pentru a adăuga RxAndroid la proiectul dvs., deschideți modulul la nivel build.gradle fișier și adăugați cea mai recentă versiune a bibliotecii în secțiunea dependențe. La momentul redactării, cea mai recentă versiune a lui RxAndroid a fost 2.0.1, așa că adaug următoarele:

dependente ... compile 'io.reactivex.rxjava2: rxandroid: 2.0.1'

După adăugarea acestei biblioteci în proiectul dvs., puteți specifica că rezultatele unei observabile ar trebui trimise la firul principal al aplicației pentru aplicația dvs., utilizând o singură linie de cod:

.observeOn (AndroidSchedulers.mainThread ())

Având în vedere că comunicarea cu firul principal al aplicației din aplicația dvs. are o pagină completă din documentele oficiale Android, aceasta este o îmbunătățire imensă care ar putea să vă economisească mult timp atunci când creați aplicații Android cu mai multe fire.

RwJava Major Drawback

În timp ce RxJava are multe de oferit dezvoltatorilor Android, nici o tehnologie nu este perfectă, iar RxJava are o capcană majoră care are potențialul de a vă prăbuși aplicația.

În mod implicit, RxJava operează un flux de lucru bazat pe push-push: datele sunt produse în amonte de către un Observabil, și este apoi împins în aval la cel alocat Observator. Problema principală cu un flux de lucru bazat pe push este cât de ușor este pentru producător (în acest caz, Observabil) să emită prea rapid articole pentru consumator (Observator) a procesa.

O discuție Observabil și o lentă Observator poate duce rapid la o întârziere a elementelor neconsumate, care va alimenta resursele sistemului și poate duce chiar la OutOfMemoryException. Această problemă este cunoscută sub numele de contrapresiune.

Dacă bănuiți că apariția contrapresiunii apare în aplicația dvs., atunci există câteva soluții posibile, inclusiv utilizarea unui operator pentru a reduce numărul de articole produse.

Crearea perioadelor de eșantionare cu probă() și throttlefirst ()

Daca un Observabil emit un număr mare de articole, atunci este posibil să nu fie necesar pentru cei alocați Observator a primi fiecare un singur dintre aceste elemente.

Dacă puteți ignora în siguranță un anumit număr Observabilemisiile, atunci există câțiva operatori pe care îi puteți folosi pentru a crea perioade de eșantionare și apoi pentru a alege valorile specifice care sunt emise în aceste perioade:

  • probă() Operatorul verifică ieșirea Observatorului la intervalele specificate de dvs. și apoi ia cel mai recent element emis în timpul acelei perioade de eșantionare. De exemplu, dacă includeți .eșantion (5, SECONDS) în proiectul dvs., Observatorul va primi ultima valoare emisă în fiecare interval de cinci secunde. 
  • throttleFirst () Operatorul ia prima valoare care a fost emisă în timpul perioadei de eșantionare. De exemplu, dacă includeți .Throttlefirst (5, SECONDS) atunci Observatorul primește prima valoare emisă în fiecare interval de cinci secunde.  

Emisiile de dozare cu tampon()

Dacă nu puteți săriți fără emisii de emisii, atunci este posibil să aveți în continuare o presiune de pe o luptă Observator prin gruparea emisiilor în loturi și apoi prin trimiterea acestora en masse. Procesarea emisiilor în vrac este de obicei mai eficientă decât prelucrarea separată a emisiilor multiple, prin urmare această abordare ar trebui să îmbunătățească rata de consum.

Aveți posibilitatea să creați emisii în vrac utilizând tampon() operator. Aici, folosim tampon() să eșueze toate articolele emise pe o perioadă de trei secunde:

(0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

Alternativ, puteți utiliza tampon() pentru a crea un lot format dintr-un număr specific de emisii. De exemplu, aici vă spunem tampon() pentru a grupa emisiile în grupuri de patru:

Obsabili.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Înlocuirea obiectelor observabile cu fluide

O metodă alternativă de reducere a numărului de emisii este înlocuirea Observabil asta te provoacă probleme cu a curgător.

În RxJava 2, echipa RxJava a decis să împartă standardul Observabil în două tipuri: tipul obișnuit pe care l-am vizionat în această serie și curgătors.

curgătors funcția în același fel ca Observabils, dar cu o diferență majoră: curgătors trimite doar cât mai multe elemente ca cererile de observator. Dacă aveți un Observabil care emit mai multe elemente decât observatorul său atribuit poate consuma, atunci poate doriți să luați în considerare trecerea la o curgător in schimb.

Înainte de a începe să utilizați curgătors în proiectele dvs., trebuie să adăugați următoarea declarație de import:

import io.reactivex.Flowable;

Puteți crea apoi curgătorfolosind exact aceleași tehnici utilizate pentru a crea Observabils. De exemplu, fiecare dintre următoarele fragmente de cod va crea un curgător care este capabil să emită date:

curgător flowable.fromArray (new String [] "sud", "north", "west", "east"
curgător flowable = Flowable.range (0, 20); ... flowable.subscribe ()

În acest moment, vă puteți întreba: de ce aș folosi vreodată? Observabilatunci când pot folosi doar curgătors și nu trebuie să vă faceți griji cu privire la contrapresiune? Răspunsul este că a curgător se confruntă cu mai mult decît cu un regulat Observabil, astfel încât, în interesul creării unei aplicații cu performanțe ridicate, ar trebui să vă lipiți Observabilcu excepția cazului în care bănuiți că cererea dvs. se luptă cu contrapresiunea.

Single

A curgător nu este singura variantă Observabil pe care îl veți găsi în RxJava, deoarece biblioteca include și Singur clasă.

Single sunt utile atunci când trebuie doar să emități o valoare. În aceste scenarii, crearea unui Observabil se poate simți ca o suprasolicitare, dar a Singur este proiectat să emită pur și simplu o singură valoare și apoi să fie complet, fie prin apelarea:

  • onSuccess (): The Singur își evaluează singura valoare.  
  • onerror (): În cazul în care Singur nu este în măsură să emită elementul său, atunci va trece această metodă rezultatul Dispensabil.

A Singur va suna doar una dintre aceste metode și apoi va termina imediat.

Să ne uităm la un exemplu de a Singur în acțiune - din nou, pentru a economisi timpul, vom reutiliza codul:

import android.os.Bundle; import șiroid.support.v7.app.AppCompatActivity; import șiroid.util.Log; import io.reactivex.Single; import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; clasa publica MainActivity extinde AppCompatActivity public static final String TAG = "MainActivity"; @Override protejate void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .Subscribe (getSingleObserver ());  privat SingleObserver getSingleObserver () returnează noul SingleObserver() @Override public void onSubscribe (Disponibil d) Log.e (TAG, "onSubscribe");  @Override public void onSuccess (valoarea șirului) Log.e (TAG, "onSuccess:" + valoare);  @Override publice void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Rulați proiectul pe un dispozitiv AVD sau fizic Android și veți vedea următoarea ieșire în monitorul Logcat Monitor Android:

Dacă vă răzgândiți și doriți să convertiți a Singur într-un Observabil în orice moment, apoi din nou RxJava are toți operatorii de care aveți nevoie, inclusiv:

  • mergeWith (): Fuzionează multiple Single într-un singur Observabil
  • concatWith (): Lanțurile articolelor emise de mai multe Single împreună, pentru a forma un Observabil emisie. 
  • toObservable (): Convertește a Singur într-un Observabil care emite elementul care a fost inițial emis de Unic, și apoi finalizează.

rezumat

În acest post am explorat unii operatori RxJava pe care îl puteți utiliza pentru a crea și gestiona mai multe fire, fără complexitatea și potențialul de eroare, care în mod tradițional este însoțit de multithreading pe Android. De asemenea, am văzut cum puteți folosi biblioteca RxAndroid pentru a comunica cu firul principal al UI principal al aplicației Android utilizând o singură linie de cod și cum să asigurați că contrapresiunea nu devine o problemă în aplicația dvs..

Am atins biblioteca RxAndroid de câteva ori pe parcursul acestei serii, dar această bibliotecă este plină de legături RxJava specifice Android care pot fi de neprețuit atunci când lucrăm cu RxJava pe platforma Android, deci în ultimul post din această serie vom să te uiți la biblioteca RxAndroid mult mai detaliat.

Până atunci, verificați câteva dintre celelalte postări despre codificarea pentru Android!

Cod