Python este una dintre cele mai populare limbi pentru prelucrarea datelor și știința datelor în general. Ecosistemul oferă o mulțime de biblioteci și cadre care facilitează computerele de înaltă performanță. Făcând programare paralelă în Python se poate dovedi destul de complicat.
În acest tutorial, vom studia de ce paralelismul este greu, mai ales în contextul Python, și pentru asta vom trece prin următoarele:
Blocarea internațională a interpreților (GIL) este unul dintre subiectele cele mai controversate din lumea Python. În CPython, cea mai populară implementare a Python, GIL este un mutex care face ca lucrurile să fie în siguranță. GIL-ul facilitează integrarea cu biblioteci externe care nu sunt siguri pe fire și face codul paralel mai rapid. Acest lucru vine la un cost, deși. Datorită modelului GIL, nu putem realiza paralelismul adevărat prin multithreading. Practic, două fire diferite native ale aceluiași proces nu pot rula codul Python simultan.
Lucrurile nu sunt atât de rele, și de aici, de ce: lucrurile care se întâmplă în afara domeniului GIL sunt libere să fie paralele. În această categorie intră sarcini de lungă durată precum I / O și, din fericire, bibliotecile NumPy
.
Deci, Python nu este cu adevărat multithreaded. Dar ce este un fir? Să facem un pas înapoi și să privim lucrurile în perspectivă.
Un proces este o abstractizare de bază a sistemului de operare. Este un program care este în execuție - cu alte cuvinte, cod care rulează. Mai multe procese rulează întotdeauna într-un computer și se execută în paralel.
Un proces poate avea mai multe fire. Ele execută același cod aparținând procesului părinte. În mod ideal, ele rulează în paralel, dar nu neapărat. Motivul pentru care procesele nu sunt suficiente este faptul că aplicațiile trebuie să răspundă și să asculte acțiunile utilizatorilor în timp ce actualizează afișarea și salvează un fișier.
Dacă este încă un pic neclar, iată o cheschetă:
PROCESE | FIRE |
---|---|
Procesele nu împart memoria | Firele partajează memoria |
Procesele de reproducere / comutare sunt scumpe | Firele de reproducere / comutare sunt mai puțin costisitoare |
Procesele necesită mai multe resurse | Firele necesită mai puține resurse (uneori numite procese ușoare) |
Nu este necesară sincronizarea memoriei | Trebuie să utilizați mecanisme de sincronizare pentru a vă asigura că gestionați corect datele |
Nu există o rețetă care să găzduiască totul. Alegerea unei persoane este foarte dependentă de contextul și de sarcina pe care încercați să o atingeți.
Acum mergem cu un pas mai departe și ne vom arunca în concurență. Concurrency este de multe ori greșit înțeleasă și confundată cu paralelismul. Nu este cazul. Concurrency implică planificarea codului independent pentru a fi executat în mod cooperativ. Profitați de faptul că o bucată de cod este în așteptare la operațiunile de I / O, iar în acel moment rulați o parte diferită, dar independentă a codului.
În Python, putem obține un comportament concurent ușor, prin intermediul verdele. Din perspectiva paralelizării, folosirea firelor sau a verdele este echivalentă deoarece nici una dintre ele nu se execută în paralel. Grindele sunt chiar mai puțin costisitoare decât firele. Din acest motiv, verdele sunt folosite foarte mult pentru a efectua un număr imens de sarcini I / O simple, cum ar fi cele de obicei găsite în rețele și servere web.
Acum, când știm diferența dintre fire și procese, paralel și concomitent, putem ilustra modul în care sunt îndeplinite diferite sarcini pe cele două paradigme. Iată ce vom face: vom executa, de mai multe ori, o sarcină în afara GIL și una din interiorul acesteia. Le executăm în serie, folosind fire și folosind procese. Să definim sarcinile:
import os import timp import filet import multiprocesare NUM_WORKERS = 4 def only_sleep (): "" Nu face nimic, așteptați ca un cronometru să expire "" "print (" PID:% s, Nume proces:% s, "% (os.getpid (), multiprocessing.current_process () name, threading.current_thread () .name)) time.sleep (1) def crunch_numbers ():" :% s, Numele procesului:% s, Numele fileului:% s "% (os.getpid (), multiprocessing.current_process () .name, threading.current_thread < 10000000: x += 1
Am creat două sarcini. Ambele sunt lungi, dar numai crunch_numbers
activele efectuează calcule. Să fugim doar somn
în serie, multithreaded și folosind mai multe procese și comparați rezultatele:
## Rulați sarcinile în serie start_time = time.time () pentru _ în intervalul (NUM_WORKERS): only_sleep () end_time = time.time () print ("Serial time =" end_time - start_time) fișierul thread.start () pentru threadul în fire [thread.join () pentru firul în fire] end_time = time.time () () timp de lucru (=), timpul de lucru (=), timpul de execuție () start () pentru proces în procese] [process.join () pentru proces în procese] end_time = time.time () print ("Paralel timp =", end_time - start_time)
Iată ieșirea pe care o am (a ta trebuie să fie similară, deși PID-urile și orele vor varia puțin):
PID: 95726, Denumire proces: MainProcess, Denumire proces: MainThread PID: 95726, Nume proces: MainProcess, Nume subiect: MainThread PID: 95726 Nume proces: MainProcess, : MainThread Serial time = 4.018089056015015 PID: 95726, Nume proces: MainProcess, Nume file: Thread-1 PID: 95726, Nume proces: MainProcess, Thread-2 PID: 95726, 3 PID: 95726, Nume proces: MainProcess, Nume fir: Thread-4 Threads time = 1.0047411918640137 PID: 95728, Nume proces: Process-1, Thread Name: MainThread PID: 95729, Process Name: MainThread PID: 95730, Denumire proces: Process-3, Nume fir: MainThread PID: 95731, Nume proces: Process-4, Nume fir: MainThread Parallel time = 1.014023780822754
Iată câteva observații:
În cazul abordare serială, lucrurile sunt destul de evidente. Funcționăm unul după altul. Toate cele patru runde sunt executate de același thread din același proces.
Utilizarea proceselor am redus timpul de execuție până la un sfert din timpul original, doar pentru că sarcinile sunt executate în paralel. Observați modul în care fiecare sarcină este efectuată într-un proces diferit și pe MainThread
din acest proces.
Utilizarea firelor profităm de faptul că sarcinile pot fi executate simultan. Timpul de execuție este de asemenea redus la un sfert, deși nimic nu rulează în paralel. Iată cum merge: începem primul fir și începe să aștepte expirarea timerului. Am întrerupe execuția, lăsând-o să aștepte expirarea temporizatorului, iar în acest timp vom da al doilea fir. Repetăm acest lucru pentru toate firele. Într-un moment, temporizatorul primei fire expiră, așa că schimbăm execuția la el și o terminăm. Algoritmul este repetat pentru al doilea și pentru toate celelalte fire. În cele din urmă, rezultatul este ca și cum lucrurile s-ar desfășura în paralel. De asemenea, veți observa că cele patru fire diferite diferă de la și trăiesc în interiorul aceluiași proces: MainProcess
.
Puteți observa chiar că abordarea filetată este mai rapidă decât cea cu adevărat paralelă. Asta se datorează cheltuielilor generatoare ale procesului de reproducere. După cum am observat anterior, procesele de reproducere și de comutare sunt o operație costisitoare.
Să facem aceeași rutină, dar de data aceasta rularea crunch_numbers
sarcină:
start_time = time.time () pentru _ în intervalul (NUM_WORKERS): crunch_numbers () end_time = time.time () imprimare ("Serial time =", end_time - start_time) start_time = time.time () threads [threading.Thread (target = crunch_numbers) pentru _ în intervalul (NUM_WORKERS)] [thread.start () pentru firul în fire] [thread.join () pentru firul în fire] end_time = time.time () end_time - start_time) start_time = time.time () processes = [multiprocessing.Process (target = crunch_numbers) pentru _ în intervalul (NUM_WORKERS)] [process.start procese] end_time = time.time () print ("Paralel timp =", end_time - start_time)
Iată rezultatele pe care le am:
PID: 96285, Denumire proces: MainProcess, Denumire proces: MainThread PID: 96285, Nume proces: MainProcess, Denumire proces: MainThread PID: 96285, Process Name: MainProcess, : MainThread Serial time = 2.705625057220459 PID: 96285, Nume proces: MainProcess, Nume file: PID: 96285, Nume proces: MainProcess, Thread-2 PID: 96285, 3 PID: 96285, Denumire proces: MainProcess, Nume fir: Thread-4 Threads time = 2.6961309909820557 PID: 96289, Nume proces: Process-1, Nume subiect: MainThread PID: 96290, Process Name: MainThread PID: 96291, Denumire proces: Process-3, Nume fir: MainThread PID: 96292, Nume proces: Process-4, Nume fir: MainThread Parallel time = 0.8014059066772461
Principala diferență de aici este rezultatul abordării multithreaded. De data aceasta funcționează foarte asemănător cu abordarea serială și de aici: de vreme ce efectuează calcule și Python nu realizează paralelism real, firele rulează unul după altul, executându-se unul pe celălalt până când toate termină.
Python are API-uri bogate pentru a face programare paralelă / concurentă. În acest tutorial acoperim cele mai populare, dar trebuie să știți că pentru orice nevoie aveți în acest domeniu, există probabil ceva deja acolo care vă poate ajuta să vă atingeți obiectivul.
În secțiunea următoare, vom construi o aplicație practică în mai multe forme, folosind toate bibliotecile prezentate. Fără alte detalii, aici sunt modulele / bibliotecile pe care le vom acoperi:
filetat
: Modul standard de lucru cu fire în Python. Este un pachet API de nivel superior pentru funcționalitatea expusă de _fir
modul, care este o interfață la nivel inferior implementării firelor de operare.
concurrent.futures
: O parte componentă a bibliotecii standard care oferă un strat de abstractizare chiar mai înalt pe fire. Firele sunt modelate ca sarcini asincrone.
multiprocesare
: Similar cu filetat
modul, oferind o interfață foarte asemănătoare, dar folosind procese în loc de fire.
gevent și greenlets
: Greenlets, numite și micro-fire, sunt unități de execuție care pot fi programate în colaborare și pot îndeplini sarcini simultan fără mult.
țelină
: O coadă de sarcini distribuită la nivel înalt. Sarcinile sunt puse în coadă și executate simultan, utilizând diverse paradigme ca multiprocesare
sau gevent
.
Cunoașterea teoriei este frumoasă și bună, dar cel mai bun mod de a învăța este să construiești ceva practic, nu-i așa? În această secțiune, vom construi un tip clasic de aplicații care să treacă prin toate paradigmele diferite.
Să construim o aplicație care verifică durata de funcționare a site-urilor web. Există o mulțime de astfel de soluții, cele mai cunoscute fiind probabil Jetpack Monitor și Robot Uptime. Scopul acestor aplicații este să vă anunțe atunci când site-ul dvs. este în jos, astfel încât să puteți lua rapid măsuri. Iată cum funcționează:
Iată de ce este esențial să adoptați o abordare paralelă / concurentă a problemei. Pe măsură ce lista de site-uri web crește, trecerea prin listă în serie nu ne va garanta că fiecare site este verificat la fiecare cinci minute sau cam asa ceva. Site-urile web ar putea să scadă ore întregi, iar proprietarul nu va fi anunțat.
Să începem prin scrierea unor utilități:
# utils.py Timp de importare a importului de import Timp de import de tip WebsiteDownException (Excepție): pass def ping_website (adresa, timeout = 20): "" "Verificați dacă un site este în jos.Un site este considerat jos dacă status_code> = 400 dacă expirați timpul de expirare Aruncați un WebsiteDownException dacă sunt îndeplinite oricare dintre condițiile de pe site-ul web "" încercați: response = requests.head (adresa, timeout = timeout) dacă answer.status_code> = 400: logging.warning status_code =% s "% (adresa, answer.status_code)) ridica WebsiteDownException () cu excepția requests.exceptions.RequestException: logging.warning (" Timeout expirat pentru site-ul% s "% address) "" Trimiteți proprietarului adresei o notificare cu privire la faptul că site-ul lor este în jos "" Acum, doar vom dormi timp de 0,5 secunde, dar aici vă trimiteți un mesaj de e-mail, notificare push sau mesaj de scriere a textului "". info ("Notificarea proprietarului site-ului% s"% address) time.sleep (0.5) def check_webs ite (adresa): "" "Funcția utilitar: verificați dacă un site este în jos, dacă este cazul, notificați utilizatorul" "încercați: ping_website (exceptând WebsiteDownException: notify_owner
De fapt, avem nevoie de o listă de site-uri pentru a încerca sistemul nostru. Creați o listă proprie sau utilizați a mea:
# sites.py WEBSITE_LIST = ['http://envato.com', 'http://amazon.co.uk', 'http://amazon.com', 'http://facebook.com', ' http://google.com "," http://google.fr "," http://google.es "," http://google.co.uk "," http://internet.org " , 'http://gmail.com', 'http://stackoverflow.com', 'http://github.com', 'http://heroku.com', 'http: // really-cool- http://djangoproject.com "," http://rubyonrails.org "," http://basecamp.com "," http://trello.com "," http: //yiiframework.com ',' http://shopify.com ',' http://another-really-interesting-domain.co ',' http://airbnb.com ',' http: // instagram. com "," http://snapchat.com "," http://youtube.com "," http://baidu.com "," http://yahoo.com "," http: // live. com "," http://linkedin.com "," http://yandex.ru "," http://netflix.com "," http://wordpress.com "," http: // bing. com ',]
În mod normal, ați păstra această listă într-o bază de date împreună cu informațiile de contact ale proprietarului, pentru a le putea contacta. Deoarece acesta nu este subiectul principal al acestui tutorial și din motive de simplitate vom folosi această listă Python.
Dacă ați acordat foarte multă atenție, s-ar putea să fi observat două domenii cu adevărat lungi în listă, care nu sunt site-uri valide (sper că nimeni nu le-a cumpărat până când citești acest lucru ca să-mi dovedească greșită!). Am adaugat aceste doua domenii pentru a fi sigur ca avem cateva site-uri pe fiecare run. De asemenea, să numim aplicația noastră UptimeSquirrel.
În primul rând, să încercăm abordarea serială și să vedem cât de greu se realizează. Vom considera această linie de bază.
# serial_squirrel.py timpul de încărcare start_time = time.time () pentru adresa în WEBSITE_LIST: check_website (adresa) end_time = time.time () print ("Timpul pentru SerialSquirrel:% ssecs" : Timpul expirat pentru site-ul http://really-cool-available-domain.com # AVERTISMENT: root: Timpul expirat pentru site-ul http://another-really-interesting-domain.co # AVERTISMENT: root: Website http: // bing.com a revenit status_code = 405 # Timp pentru SerialSquirrel: 15.881232261657715secs
Vom deveni puțin mai creativi cu implementarea abordării filetate. Utilizăm o coadă pentru a plasa adresele și a crea fire de lucru pentru a le scoate din coadă și pentru a le procesa. Vom aștepta ca coada să fie goală, ceea ce înseamnă că toate adresele au fost procesate de firmele noastre.
# threaded_squirrel.py timpul de import din coada de import Queue din importul filetelor Thread NUM_WORKERS = 4 task_queue = Defender () def (): # Verifică constant coada pentru adrese în timp ce True: address = task_queue.get () check_website sarcina prelucrată ca fiind terminată task_queue.task_done () start_time = time.time () # Creați firele thread-urilor lucrătorilor = [Thread (target = worker) pentru _ în intervalul (NUM_WORKERS)] # Adăugați site-urile web în coada de sarcini [task_queue. puneți elementul în elementul WEBSITE_LIST] # Începeți toți lucrătorii [thread.start () pentru firul în fire] # Așteptați ca toate sarcinile din coadă să fie procesate task_queue.join () end_time = time.time () print ("Timp pentru ThreadedSquirrel:% ssecs"%) (end_time - start_time)) # AVERTISMENT: root: Timpul expirat pentru site http://really-cool-available-domain.com # AVERTISMENT: root: Timpul expirat pentru http: /another-really-interesting-domain.co # AVERTISMENT: rădăcină: Website http://bing.com returnat status_code = 405 # Time for ThreadedSquirrel: 3.1107530 59387207secs
După cum sa menționat anterior, concurrent.futures
este un API la nivel înalt pentru utilizarea firelor. Abordarea pe care o luăm aici implică utilizarea unui a ThreadPoolExecutor
. Vom trimite sarcini la piscină și vom reveni la contracte futures, rezultate care vor fi disponibile în viitor. Desigur, putem aștepta ca toate contractele futures să devină rezultate efective.
# future_squirrel.py importați importul timpului concurrent.futures NUM_WORKERS = 4 start_time = time.time () cu concurrent.futures.ThreadPoolExecutor (max_workers = NUM_WORKERS) ca executor: futures = executor.submit (check_website, address) pentru adresa în WEBSITE_LIST concurrent.futures.wait (futures) end_time = time.time () print ("Timp pentru FutureSquirrel:% ssecs"% (end_time - start_time)) # AVERTISMENT: root: -domain.com # AVERTISMENT: root: Timpul expirat pentru site-ul web http://another-really-interesting-domain.co # AVERTISMENT: rădăcină: Website http://bing.com returnat status_code = 405 # Time for FutureSquirrel: 1.812899112701416secs
multiprocesare
bibliotecă oferă un API de înlocuire aproape inexistent pentru filetat
bibliotecă. În acest caz, vom adopta o abordare mai asemănătoare cu cea din concurrent.futures
unu. Înființăm o multiprocessing.Pool
și trimiterea de sarcini la aceasta prin maparea unei funcții în lista de adrese (gândiți-vă la Python clasic Hartă
funcţie).
# multiprocessing_squirrel.py import import socket import multiprocesare NUM_WORKERS = 4 start_time = time.time () cu multiprocessing.Pool (processes = NUM_WORKERS) ca pool: results = pool.map_async (check_website, WEBSITE_LIST) results.wait () end_time = Timpul expirat pentru site-ul http://really-cool-available-domain.com # AVERTISMENT: root: Timpul expirat pentru site-ul http://another-really-interesting-domain.co # AVERTISMENT: root: Website http://bing.com returnat status_code = 405 # Timp pentru MultiProcessingSquirrel: 2.8224599361419678secs
Gevent este o alternativă populară pentru realizarea concursului masiv. Există câteva lucruri pe care trebuie să le cunoașteți înainte de a le folosi:
Codul efectuat concomitent de verdele este determinist. Spre deosebire de celelalte alternative prezentate, această paradigmă garantează că pentru oricare două runde identice veți obține întotdeauna aceleași rezultate în aceeași ordine.
Aveți nevoie de funcții standard pentru patch-uri de maimuță, astfel încât acestea să coopereze cu gevent. Iată ce vreau să spun prin asta. În mod normal, o operațiune a soclului blochează. Așteptăm ca operația să se termine. Dacă am fi într-un mediu cu mai multe fire, planificatorul ar trece simplu la alt fir, în timp ce celălalt așteaptă I / O. Din moment ce nu suntem într-un mediu multithreaded, gevent patch-uri de funcții standard, astfel încât acestea să devină non-blocare și de control de întoarcere la scheduler geven.
Pentru a instala gevent, rulați: pip instalare gevent
Iată cum să folosim gevent pentru a ne îndeplini sarcina folosind a gevent.pool.Pool
:
# green_squirrel.py timpul de import de la importul gevent.pool Piscina de la gevent import monkey # Rețineți că puteți genera mulți lucrători cu gevent, deoarece costul creării și comutării este foarte redus NUM_WORKERS = 4 # Module-machete Patch monkey pentru cererile HTTP maimuță. patch_socket () start_time = time.time () pool = Pool (NUM_WORKERS) pentru adresa în WEBSITE_LIST: pool.spawn (check_website, address) Timp pentru GreenSquirrel:% ssecs "% (end_time - start_time)) # Timp pentru GreenSquirrel: 3.8395519256591797secs
Țelina este o abordare care diferă cel mai mult de ceea ce am văzut până acum. Este testat de luptă în contextul unor medii foarte complexe și de înaltă performanță. Setarea de țelină va necesita un pic mai tinkering decât toate soluțiile de mai sus.
În primul rând, va trebui să instalăm Țelina:
pip instalati telina
Sarcinile sunt conceptele centrale din cadrul proiectului Celery. Tot ce veți dori să rulați în țelina trebuie să fie o sarcină. Țelina oferă o mare flexibilitate pentru executarea sarcinilor: le puteți rula în mod sincron sau asincron, în timp real sau programat, pe aceeași mașină sau pe mai multe mașini și utilizând fire, procese, Eventlet sau gevent.
Aranjamentul va fi ceva mai complex. Țelina folosește alte servicii pentru trimiterea și primirea mesajelor. Aceste mesaje sunt, de obicei, sarcini sau rezultate din sarcini. Vom folosi Redis în acest tutorial în acest scop. Redis este o alegere excelentă deoarece este foarte ușor de instalat și configurat și este foarte posibil să îl utilizați deja în aplicația dvs. pentru alte scopuri, cum ar fi caching-ul și pub / sub.
Puteți instala Redis urmând instrucțiunile de pe pagina Redis Quick Start. Nu uitați să instalați Redis
Biblioteca Python, pip install redis
, și pachetul necesar pentru utilizarea Redis și Țelină: pip instalare telina [redis]
.
Porniți serverul Redis astfel: $ redis-server
Pentru a începe să construiți chestii cu Celery, va trebui mai întâi să creați o aplicație pentru Țelină. După aceea, Țelina trebuie să știe ce fel de sarcini ar putea executa. Pentru a realiza acest lucru, trebuie să înregistrăm sarcini în aplicația Celery. Vom face acest lucru folosind @ app.task
decorator:
# celery_squirrel.py timpul de import din importul utils check_website din importul de date WEBSITE_LIST din importul de țelină Țelină din importul de telina.result ResultSet app = Țelina ("celery_squirrel", broker = "redis: // localhost: 6379/0", backend = "redis : // localhost: 6379/0 ") @ app.task def check_website_task (adresa): return check_website (adresa) if __name__ ==" __main__ ": start_time = time.time = ResultSet ([check_website_task.delay (adresa) pentru adresa în WEBSITE_LIST]) # Așteptați ca sarcinile să se termine rs.get () end_time = time.time () print ("CelerySquirrel:" end_time - start_time) # CelerySquirrel: 2.4979639053344727
Nu intră în panică dacă nu se întâmplă nimic. Amintiți-vă, Țelina este un serviciu și trebuie să o executați. Până acum, am plasat doar sarcinile în Redis, dar nu am început să-i executăm pe Țelina. Pentru a face acest lucru, trebuie să executați această comandă în dosarul în care se află codul nostru:
țelina muncitoare -A do_celery --loglevel = debug --concurrency = 4
Acum repetați scriptul Python și vedeți ce se întâmplă. Unul dintre lucrurile pe care trebuie să le acordați este: observați cum am trecut de două ori adresa Redis la aplicația Redis. agent
parametrul specifică unde sunt transferate sarcinile către Țelina și backend
este locul în care Țelina pune rezultatele astfel încât să le putem folosi în aplicația noastră. Dacă nu specificăm un rezultat backend
, nu avem cum să știm când a fost procesată sarcina și care a fost rezultatul.
De asemenea, aveți în vedere că bușoanele sunt acum la ieșirea standard a procesului de țelină, deci asigurați-vă că le verificați în terminalul corespunzător.
Sper că aceasta a fost o călătorie interesantă pentru dvs. și o bună introducere în lumea programării paralele / concurente în Python. Acesta este sfârșitul călătoriei și există câteva concluzii pe care le putem trage:
filetat
și concurrent.futures
biblioteci.multiprocesare
oferă o interfață foarte asemănătoare cu filetat
ci mai degrabă pentru procese decât pentru fire.Aflați Python cu ghidul nostru complet de instrucțiuni Python, indiferent dacă sunteți doar începători sau sunteți un coder experimentat în căutarea unor noi abilități.