Un coprocessus, aussi appelé processus léger et thread en anglais, est un fil d’exécution d’un programme pouvant s’exécuter en parallèle avec d’autres coprocessus du même programme.
Ce chapitre décrit les appels systèmes Unix permettant à un programme de
créer des coprocessus et de les synchroniser à l’aide de verrous, de
conditions ou d’événements. Il s’appuie sur la bibliothèque OCaml
Thread
et les bibliothèques Mutex
, Condition
. Il présente également
la communication par événements synchrones fournie par la bibliothèque et
Event
.
La création d’un coprocessus est très différente de l’opération fork
qui crée une copie du processus courant (donc une copie du programme):
les espaces mémoire du père et du fils sont totalement disjoints après
l’exécution de fork
et les processus ne peuvent communiquer que par un
appel système (comme écrire ou lire dans un fichier ou dans un tuyau).
Au contraire, tous les coprocessus d’un même programme partagent le même espace d’adressage. Les seules données qui les différencient et qui ne sont pas partagées sont leur identité et leur pile d’exécution, ainsi que quelques informations pour le système (masque des signaux, état des verrous et conditions, etc.). De ce point de vue, les coprocessus ressemblent aux coroutines. Les coprocessus d’un même programme forment un groupe: ils sont tous traités de la même façon, sauf un, le coprocessus principal, qui a été créé au démarrage du programme et dont la terminaison entraîne l’arrêt de tous les autres coprocessus et l’arrêt du programme. Lorsque nous parlons de plusieurs coprocessus, nous considérons implicitement qu’ils s’agit des coprocessus d’un même programme.
À la différence des coroutines, qui ne peuvent pas s’exécuter en parallèle —l’une devant passer explicitement la main avant que l’autre ne puisse s’exécuter à son tour— les coprocessus s’exécutent en parallèle ou du moins tout se passe comme s’ils le faisaient, car ils peuvent être interrompus de façon préemptive par le système pour donner la main à d’autres coprocessus. De ce point de vue, les coprocessus ressemblent aux processus.
Le partage de l’espace mémoire permet aux coprocessus de communiquer directement entre eux par leur mémoire commune. Bien qu’en principe, deux processus n’aient pas besoin pour cela de passer par le système d’exploitation, le fait qu’ils s’exécutent en parallèle les oblige à se synchroniser au moment de la communication (l’un devant finir d’écrire avant que l’autre ne commence à lire), ce qui nécessite, en général, de passer par le système d’exploitation. La synchronisation entre coprocessus, quasiment indispensable pour toute communication, reste souvent une partie difficile de la programmation avec des coprocessus. Elle peut se faire à l’aide de verrous et de conditions, ou bien de façon plus évoluée, en communiquant par des événements.
Le gain à l’utilisation des coprocessus par rapport aux processus reste le coût moindre à leur création et la possibilité d’échanger de grosses structures de données sans recopie, simplement en s’échangeant un pointeur. Inversement, l’utilisation des coprocessus a un coût qui est de devoir bien gérer la synchronisation entre eux, y compris en cas d’erreur fatale dans l’un d’eux. En particulier un coprocesseur doit bien faire attention de libérer ses verrous et de préserver ses invariant avant de s’arrêter. Aussi, on pourra préférer les processus aux coprocessus lorsqu’on ne profite pas réellement des avantages de ces derniers.
Pour compiler une application utilisant des coprocessus natifs il faut faire:
ocamlc -thread unix.cma threads.cma -o prog mod1.ml mod2.ml mod3.ml
ocamlopt -thread unix.cmxa threads.cmxa -o prog mod1.ml mod2.ml mod3.ml
Si votre installation ne supporte pas les coprocessus natifs, vous pouvez voir à la section 7.8 ou dans le manuel comment utiliser les coprocessus simulés. Le discours et les exemples de ce chapitre supposent des coprocessus natifs et ne s’applique pas, en général, aux coprocessus simulés.
Les fonctions décrites dans cette section sont définies dans le module
Thread
.
L’appel système create f a crée un nouveau coprocessus dans lequel l’application de fonction f a est exécutée. La création d’un coprocessus retourne à l’appelant une poignée sur le coprocessus fraîchement créé qui peut être utilisée pour son contrôle.
val Thread.create : ('a -> 'b) -> 'a -> t |
Le calcul s’exécute de façon concurrente avec les calculs effectués par les autres coprocessus du programme. Le coprocessus termine lorsque l’application f a termine. Le résultat du calcul est simplement ignoré. Si le coprocessus termine par une exception non rattrapée, celle-ci n’est pas propagée dans le coprocessus appelant ou principal mais simplement ignorée après affichage d’un message dans la sortie d’erreur. (Les autres coprocessus ont a priori évolué indépendemment et ne seraient pas en mesure de recevoir l’exception.)
Un coprocessus peut aussi se terminer prématurément en appelant la fonction
exit (de la bibliothèque Thread
), à ne pas
confondre avec la fonction de même nom de la bibliothèque
Pervasives
qui termine le programme tout entier, i.e. tous ses coprocessus.
Le coprocessus principal d’un programme, qui est celui qui a lancé le
premier d’autres coprocessus, exécute implicitement la fonction
Pervasives.exit
lorsqu’il termine.
Si un coprocessus termine avant le coprocessus principal, alors il est
désalloué immédiatement, et ne devient pas un fantôme comme dans le cas d’un
processus Unix créé par fork
. La bibliothèque OCaml se charge de cette
gestion.
Nous en savons déjà assez pour proposer une alternative au modèle du
serveur concurrent par «fork» (ou «double fork») vu dans le cours précédent
en faisant effectuer le traitement par un coprocessus plutôt que par un
processus fils. Pour établir un tel serveur, il nous suffit de
proposer une variante Misc.co_treatment
de la fonction
Misc.fork_treatment
définie dans le chapitre 6.
let co_treatment server_sock service (client_descr, _ as client) = try ignore (Thread.create service client) with exn -> close client_descr; raise exn;; |
Si le coprocessus a pu être créé, le traitement est entièrement pris en
compte par la fonction service
y compris la fermeture de client_descr
.
Sinon, on ferme le descripteur client_descr
(le client est abandonné),
et on laisse le programme principal traiter l’erreur.
Attention, toute la difficulté du coserveur est cachée dans la fonction
service
qui doit traiter la connexion avec robustesse et jusqu’à la
déconnexion. Dans le cas du serveur concurrent où le service est exécuté
par un autre processus, une erreur fatale pendant le service conduisant à la
mort prématurée du service produit par défaut le bon comportement qui est
de fermer la connexion, car le système ferme les descripteurs à la mort d’un
processus. Il n’en est plus de même dans le cas où le service est exécuté
par un coprocessus, car les descripteurs entre les différents coprocessus
sont par défaut partagés et ne sont pas fermés à la mort du
coprocessus. C’est donc au coprocessus de fermer ses descripteurs avant de
mourir. De plus, un coprocessus ne peut pas appeler Pervasives.exit
dans le
cas d’une erreur fatale dans le traitement d’un service, car celle-ci
arrêterait non seulement le service mais aussi le serveur. Appelé
Thread.exit
n’est souvent pas non plus une solution, car on risque de ne
pas avoir bien désalloué les ressources ouvertes, et en particulier la
connexion. Une solution consiste à lever une exception signifiant un arrêt
fatal (par exemple une exception Exit
) provoquant l’exécution du code de
finalisation à sa remontée. Pour des raisons analogues, il est essentiel
que le signal sigpipe
soit dans un traitement du service par coprocessus,
remplaçant l’arrêt immédiat du coprocessus par la levée d’une exception
EPIPE
.
Les fonctions décrites dans cette section sont définies dans le module
Thread
.
L’appel système join permet à un coprocessus d’en attendre un autre.
val Thread.join : t -> unit |
Le coprocessus appelant est alors suspendu jusqu’à ce que celui dont l’identité est passée en argument ait terminé son exécution. Cet appel peut également être utilisé par le coprocessus principal pour attendre que tous les autres aient retourné avant de terminer lui-même et de terminer le programme (le comportement par défaut étant de tuer les autres coprocessus sans attendre leur terminaison).
Bien que cet appel soit bloquant donc du type «long», il est relancé
automatiquement à la réception d’un signal: il est effectivement interrompu
par un signal, le handler est traité, puis l’appel est relancé. On ne
revient donc de l’appel que quand le coprocessus à réellement terminé
et l’appel ne doit jamais lever l’exception EINTR
. Du point de vue du
programmeur OCaml, cela se comporte comme si le signal était reçu au moment
où l’appel retourne.
Un coprocessus ne retourne pas, car il s’exécute de façon asynchrone. Mais son action peut être observée — heureusement! — par ses effets de bords. Par exemple, un coprocessus peut placer le résultat d’un calcul dans une référence qu’un autre coprocessus ira consulter après s’être assuré de la terminaison du calcul. Nous illustrons cela dans l’exemple suivant.
exception Exited type 'a result = Value of 'a | Exception of exn let eval f x = try Value (f x) with z -> Exception z let coexec (f : 'a -> 'b) (x : 'a) : unit -> 'b = let result = ref (Exception Exited) in let p = Thread.create (fun x -> result := eval f x) x in function() -> match join p; !result with | Value v -> v | Exception exn -> raise exn;; let v1 = coexec succ 4 and v2 = coexec succ 5 in v1()+v2();; |
Le système peut suspendre un coprocessus pour donner temporairement la main à un autre ou parce qu’il est en attente d’une ressource utilisée par un de ses coprocessus (verrous et conditions, par exemple) ou par un autre processus (descripteur de fichier, par exemple). Un coprocessus peut également se suspendre de sa propre initiative. La fonction yield permet à un coprocessus de redonner la main prématurément (sans attendre la préemption par le système).
val Thread.yield : unit -> unit |
C’est une indication pour le gestionnaire de coprocessus, mais son effet peut être nul, par exemple, si aucun coprocessus ne peut s’exécuter immédiatement. Le système peut donc décider de redonner la main au même coprocessus.
Inversement, il n’est pas nécessaire d’exécuter yield
pour permettre à
d’autres coprocessus de s’exécuter, car le système se réserve le droit
d’exécuter lui-même la commande yield
à tout moment. En fait, il exerce ce
droit à intervalles de temps suffisamment rapprochés pour permettre à
d’autres coprocessus de s’exécuter et donner l’illusion à l’utilisateur que
les coprocessus s’exécutent en parallèle, même sur une machine
mono-processeur.
Exemple:
On peut reprendre et modifier l’exemple 3.3 pour utiliser des coprocessus plutôt que des processus.
let rec psearch k cond v = let n = Array.length v in let slice i = Array.sub v (i * k) (min k (n - i * k)) in let slices = Array.init (n/k) slice in let found = ref false in let pcond v = if !found then Thread.exit(); cond v in let search v = if simple_search pcond v then found := true in let proc_list = Array.map (Thread.create search) slices in Array.iter Thread.join proc_list; !found;; |
La fonction psearch
k f v recherche avec k coprocessus en parallèle
une occurrence dans le tableau satisfaisant la fonction f. La fonction
pcond
permet d’interrompre la recherche en cours dès qu’une réponse a été
trouvée. Tous les coprocessus partagent la même référence found
: ils
peuvent donc y accéder de façon concurrente. Il n’y a pas de section
critique entre les différents coprocessus car s’ils écrivent en parallèle
dans cette ressource, ils écrivent la même valeur. Il est important que les
coprocessus n’écrivent pas le résultat de la recherche quand celui-ci est
faux! par exemple le remplacement de la ligne 7 par
let search v = found := !found && simple_search pcond v |
ou même:
let search v = let r = simple_search pcond v in found := !found && r |
serait incorrect.
la recherche en parallèle est intéressante même sur une machine mono-processeur si la comparaison des éléments peut être bloquée temporairement (par exemple par des accès disques ou, mieux, des connexions réseau). Dans ce cas, le coprocessus qui effectue la recherche passe la main à un autre et la machine peut donc continuer le calcul sur une autre partie du tableau et revenir au coprocessus bloqué lorsque sa ressource sera libérée.
L’accès à certains éléments peut avoir une latence importante, de l’ordre de la seconde s’il faut récupérer de l’information sur le réseau. Dans ce cas, la différence de comportement entre une recherche séquentielle et une recherche en parallèle devient flagrante.
Les autres formes de suspension sont liées à des ressources du système d’exploitation. Un coprocessus peut se suspendre pendant un certain temps en appelant delay s. Une fois s secondes écoulées, il pourra être relancé.
val Thread.delay : float -> unit |
Cette primitive est fournie pour des raisons de portabilité avec les
coprocessus simulés, mais Thread.delay t
est simplement une abréviation
pour ignore (Unix.select [] [] [] t)
. Cet appel, contrairement à
Thread.join
, n’est pas relancé lorsqu’il est interrompu par un signal.
Pour synchroniser un coprocessus avec une opération externe, on peut
utiliser la commande select d’Unix. Bien entendu, celle-ci ne
bloquera que le coprocessus appelant et non le programme tout entier.
(Le module Thread
redéfinit cette fonction, car dans le cas
des coprocessus simulés, elle ne doit pas appeler directement cette
du module Unix
, ce qui bloquerait le programme tout entier,
donc tous les coprocessus. Il faut donc utiliser Thread.select
et non
Unix.select
, même si les deux sont équivalents dans le cas des coprocessus
natifs.)
Exemple:
Pour faire fonctionnner le crible d’Ératosthène avec des coprocessus plutôt que par duplication de processus Unix, il suffit de remplacer les lignes 30–41 par
let p = Thread.create filter (in_channel_of_descr fd_in) in let output = out_channel_of_descr fd_out in try while true do let n = input_int input in if List.exists (fun m -> n mod m = 0) first_primes then () else output_int output n done; with End_of_file -> close_out output; Thread.join p |
et les lignes 46–52 par
let k = Thread.create filter (in_channel_of_descr fd_in) in let output = out_channel_of_descr fd_out in generate len output; close_out output; Thread.join k;; |
Toutefois, il ne faut espérer aucun gain significatif sur cet exemple qui utilise peu de processus par rapport au temps de calcul.
Les fonctions de cette section sont définies dans le module Mutex
(pour Mutual exclusion, en anglais).
Nous avons évoqué ci-dessus un problème d’accès concurrent aux ressources mutables. En particulier, le scénario suivant illustre le problème d’accès aux ressources partagées. Considérons un compteur c et deux processus p et q, chacun incrémentant en parallèle le même compteur c.
Temps Coprocessus p lit k écrit k+1 Coprocessus q lit k écrit k+1 Valeur de c k k k+1 k+1
Supposons le scénario décrit dans la Figure 7.1. Le coprocessus p lit la valeur du compteur c, puis donne la main à q. À son tour, q lit la valeur de c puis écrit la valeur k+1 dans c. Le processus p reprend la main et écrit la valeur k+1 dans c. La valeur finale de c est donc k+1 au lieu de k+2.
Ce problème classique peut être résolu en utilisant des verrous qui empêchent l’entrelacement arbitraire de p et q.
Les verrous sont des objets partagés par l’ensemble des coprocessus d’un même programme qui ne peuvent être possédés que par un seul coprocessus à la fois. Un verrou est créé par la fonction create.
val Mutex.create : unit -> Mutex.t |
Cette opération ne fait que construire le verrou mais ne le bloque pas. Pour prendre un verrou déjà existant, il faut appeler la fonction lock en lui passant le verrou en argument. Si le verrou est possédé par un autre processus, alors l’exécution du processus appelant est gelée jusqu’à ce que le verrou soit libéré. Sinon le verrou est libre et l’exécution continue en empêchant tout autre processus d’acquérir le verrou jusqu’à ce que celui-ci soit libéré. La libération d’un verrou doit se faire explicitement par le processus qui le possède, en appelant unlock.
L’appel système Mutex.lock
se comporte comme Thread.join
vis-à-vis des
signaux: si le coprocessus reçoit un signal pendant qu’il exécute
Mutex.lock
, le signal sera pris en compte (i.e. le runtime OCaml
informé du déclanchement du signal), mais le coprocessus se remet en
attente de telle façon que Mutex.lock
ne retourne effectivement que
lorsque le lock
a effectivement été acquis et bien sûr Mutex.lock
ne
levera pas l’exception EINTR
. Le traitement réel du signal par OCaml se
fera seulement au retour de Mutex.lock
.
On peut également tenter de prendre un verrou sans bloquer en appelant try_lock
val Mutex.try_lock : Mutex.t -> bool |
Cette fonction retourne true
si le verrou a pu être pris (il était donc
libre) et false
sinon. Dans ce dernier cas, l’exécution n’est pas
suspendue, puisque le verrou n’est pas pris. Le coprocessus peut donc
faire autre chose et éventuellement revenir tenter sa chance plus tard.
Exemple:
Incrémenter un compteur global utilisé par plusieurs coprocessus pose un problème de synchronisation: les instants entre la lecture de la valeur du compteur et l’écriture de la valeur incrémentée sont dans une région critique, i.e. deux coprocessus ne doivent pas être en même temps dans cette région. La synchronisation peut facilement être gérée par un verrou.
type counter = { lock : Mutex.t; mutable counter : int } let newcounter() = { lock = Mutex.create(); counter = 0 } let addtocounter c k = Mutex.lock c.lock; c.counter <- c.counter + k; Mutex.unlock c.lock;; |
La seule consultation du compteur ne pose pas de problème. Elle peut être effectuée en parallèle avec une modification du compteur, le résultat sera simplement la valeur du compteur juste avant ou juste après sa modification, les deux réponses étant cohérentes.
Un motif fréquent est la prise de verrou temporaire pendant un appel de fonction. Il faut bien sûr prendre soin de le relâcher à la fin de l’appel que l’appel ait réussi ou échoué. On peut abstraire ce comportement dans une fonction de bibliothèque:
let run_with_lock l f x = Mutex.lock l; try_finalize f x Mutex.unlock l |
Dans l’exemple précédent, on aurait ainsi pu écrire:
let addtocounter c = Misc.run_with_lock c.lock (fun k -> c.counter <- c.counter + k) |
Exemple:
Une alternative au modèle du serveur avec coprocessus est de lancer un nombre de coprocessus à l’avance qui traitent les requêtes en parallèle.
val tcp_farm_server : int -> (file_descr -> file_descr * sockaddr -> 'a) -> sockaddr -> unit |
La fonction tcp_farm_server
se comporte comme tcp_server
mais
prend un argument supplémentaire qui est le nombre de coprocessus à lancer
qui vont chacun devenir serveurs sur la même adresse. L’intérêt d’une
ferme de coprocessus est de réduire le temps de traitement de chaque
connexion en éliminant le temps de création d’un coprocessus pour ce
traitement, puisque ceux-ci sont créés une fois pour toute.
let tcp_farm_server n treat_connection addr = let server_sock = Misc.install_tcp_server_socket addr in let mutex = Mutex.create() in let rec serve () = let client = Misc.run_with_lock mutex (Misc.restart_on_EINTR accept) server_sock in treat_connection server_sock client; serve () in for i = 1 to n-1 do ignore (Thread.create serve ()) done; serve ();; |
La seule précaution à prendre est d’assurer l’exclusion mutuelle autour de
accept
afin qu’un seul des coprocessus n’accepte une connexion au
même moment. L’idée est que la fonction treat_connection
fasse un traitement séquentiel, mais ce n’est pas une obligation et on peut
effectivement combiner une ferme de processus avec la création de nouveaux
coprocessus, ce qui peut s’ajuster dynamiquement selon la charge du service.
La prise et le relâchement d’un verrou est une opération peu coûteuse lorsqu’elle réussit sans bloquer. Elle est en général implémentée par une seule instruction «test-and-set» que possèdent tous les processeurs modernes (plus d’autres petits coûts induits éventuels tels que la mise à jour des caches). Par contre, lorsque le verrou n’est pas disponible, le processus doit être suspendu et reprogrammé plus tard, ce qui induit alors un coût supplémentaire significatif. Il faut donc retenir que c’est la suspension réelle d’un processus pour donner la main à un autre et non sa suspension potentielle lors de la prise d’un verrou qui est pénalisante. En conséquence, on aura presque toujours intérêt à relâcher un verrou dès que possible pour le reprendre plus tard si nécessaire plutôt que d’éviter ces deux opérations, ce qui aurait pour effet d’agrandir la région critique et donc la fréquence avec laquelle un autre coprocessus se trouvera effectivement en compétition pour le verrou et dans l’obligation de se suspendre.
Les verrous réduisent l’entrelacement. En contrepartie, ils risquent de provoquer des situations d’interblocage. Par exemple, il y a interblocage si le coprocessus p attend un verrou v possédé par le coprocessus q qui lui-même attend un verrou u possédé par p. (Dans le pire des cas, un processus attend un verrou qu’il possède lui-même...) La programmation concurrente est difficile, et se prémunir contre les situations d’interblocage n’est pas toujours facile. Une façon simple et souvent possible d’éviter cette situation consiste à définir une hiérarchie entre les verrous et de s’assurer que l’ordre dans lequel on prendra dynamiquement les verrous respecte la hiérarchie: on ne prend jamais un verrou qui n’est pas dominé par tous les verrous que l’on possède déjà.
On se propose de reprendre le relais HTTP développé dans le chapitre précédent pour servir les requêtes par des coprocessus.
Intuitivement, il suffit de remplacer la fonction
establish_server
qui crée un processus clone par une fonction qui crée un
coprocessus. Il faut cependant prendre quelques précautions...
En effet, la contrepartie des coprocessus est qu’ils partagent tous le même
espace mémoire. Il faut donc s’assurer que les coprocessus ne vont pas se
“marcher sur les pieds” l’un écrasant ce que vient juste de faire l’autre.
Cela se produit typiquement lorsque deux coprocessus modifient en parallèle
une même structure mutable.
Dans le cas du serveur HTTP, il y a quelques changements à
effectuer. Commençons par régler les problèmes d’accès aux ressources. La
fonction proxy_service
qui effectue le traitement de la connexion et
décrite à la section 6.13 appelle, par l’intermédiaire des
fonctions parse_host
, parse_url
et parse_request
, la fonction
regexp_match
qui utilise la bibliothèque Str
. Or, cette bibliothèque
n’est pas ré-entrante (le résultat de la dernière recherche est mémorisé
dans une variable globale). Cet exemple montre qu’il faut également se
méfier des appels de fonctions qui sous un air bien innocent cachent des
collisions potentielles. Dans ce cas, nous n’allons pas réécrire la
bibliothèque Str
mais simplement séquentialiser son utilisation. Il
suffit (et il n’y a pas vraiment d’autres choix) de protéger les appels à
cette bibliothèque par des verrous. Il faut quand même prendre la précaution
de bien libérer le verrou au retour de la fonction, pour un retour anormal
par une levée d’exception.
Pour modifier au minimum le code existant, il suffit de renommer la
définition de regexp_match
du module Url
en unsafe_regexp_match
puis de définir regexp_match
comme une version
protégée de unsafe_regexp_match
.
let strlock = Mutex.create();; let regexp_match r string = Misc.run_with_lock strlock (unsafe_regexp_match r) string;; |
La modification à l’air minime. Il faut toutefois remarquer que la fonction
regexp_match
regroupe à la fois le filtrage de l’expression et
l’extraction de la chaîne filtrée. Il eût bien sûr été incorrect de protéger
individuellement les fonctions Str.string_match
et Str.matched_group
.
Une autre solution serait de réécrire les fonctions d’analyse sans utiliser
la bibliothèque Str
. Mais il n’y a pas de raison pour
un tel choix tant que la synchronisation des primitives de la bibliothèque
reste simplement réalisable et ne s’avère pas être une source
d’inefficacité. Évidemment, une meilleure solution serait que la
bibliothèque Str
soit ré-entrante en premier lieu.
Les autres fonctions appelées sont bien réentrantes, notamment la fonction
Misc.retransmit
qui alloue des tampons différents pour chaque appel.
Cependant, il reste encore quelques précautions à prendre vis à vis du
traitement des erreurs. Le traitement d’une connexion par coprocessus doit
être robuste comme expliqué ci-dessus. En particulier, en cas d’erreur, il
ne faut pas que les autres processus soit affectés, c’est-à-dire que le
coprocessus doit terminer «normalement» en fermant proprement la connexion en
cause et se remettre en attente d’autres connexions. Nous devons d’abord
remplacer l’appel à exit
dans handle_error
car il ne faut
surtout pas tuer le processus en cours. Un appel à Thread.exit
ne
serait pas correct non plus, car la mort d’un coprocessus ne ferme pas ses
descripteurs (partagés), comme le ferait le système à la mort d’un
processus. Une erreur dans le traitement d’une connexion laisserait alors la
connexion ouverte. La solution consiste à lever une exception Exit
qui permet au code de finalisation de faire ce qu’il faut. Nous devons
maintenant protéger
treat_connection
pour rattraper toutes les erreurs: notamment
Exit
mais aussi EPIPE
qui peut être levée si le client ferme
prématurément sa connexion. Nous ferons effectuer ce traitement
par une fonction de protection.
let allow_connection_errors f s = try f s with Exit | Unix_error(EPIPE,_,_) -> () |
let treat_connection s = Misc.co_treatment s (allow_connection_errors proxy_service) in |
Les fonctions décrites dans cette section sont définies dans le module
Condition
.
Le mécanisme de synchronisation par verrous est très simple, mais il n’est pas suffisant: les verrous permettent d’attendre qu’une donnée partagée soit libre, mais il ne permettent pas d’attendre la forme d’une donnée. Remplaçons l’exemple du compteur par une file d’attente (premier entré/premier sorti) partagée entre plusieurs coprocessus. L’ajout d’une valeur dans la queue peut être synchronisé en utilisant un verrou comme ci-dessus, car quelque soit la forme de la queue, on peut toujours y ajouter un élément. Mais qu’en est-il du retrait d’une valeur de la queue? Que faut-il faire lorsque la queue est vide? On ne peut pas garder le verrou en attendant que la queue se remplisse, car cela empêcherait justement un autre coprocessus de remplir la queue. Il faut donc le rendre. Mais comment savoir quand la queue ne sera plus vide, sinon qu’en testant à nouveau périodiquement? Cette solution appelée “attente active” n’est bien sûr pas satisfaisante. Ou bien elle consomme du temps de calcul inutilement (période trop courte) ou bien elle n’est pas assez réactive (période trop longue).
Les conditions permettent de résoudre ce problème. Les conditions sont des objets sur lequel un coprocessus qui possède un verrou peut se mettre en attente jusqu’à ce qu’un autre coprocessus envoie un signal sur cette condition.
Comme les verrous, les conditions sont des structures passives qui peuvent être manipulées par des fonctions de synchronisation. On peut les créer par la fonction create.
val Condition.create : unit -> Condition.t |
Un processus p qui possède déjà un verrou v peut se mettre en attente sur une condition c et le verrou v en appelant wait c v. Le processus p informe le système qu’il est en attente sur la condition c et le verrou v puis libère le verrou v et s’endort. Il ne sera réveillé par le système que lorsqu’un autre processus q aura signalé un changement sur la condition c et lorsque le verrou v sera disponible; le processus p tient alors à nouveau le verrou v.
val Condition.wait : Condition.t -> Mutex.t -> unit |
Attention! c’est une erreur d’appeler Condition.wait
c v sans être en
possession du verrou v.
Le comportement de Condition.wait
vis à vis des signaux est le même que
pour Mutex.lock
.
Lorsqu’un coprocessus signale un changement sur une condition, il peut demander ou bien que tous les coprocessus en attente sur cette condition soient réveillés (broadcast), ou bien qu’un seul parmi ceux-ci soit réveillés (signal).
L’envoi d’un signal ou d’un broadcast sur une condition ne nécessite pas d’être en possession d’un verrou (à la différence de l’attente), dans le sens où cela ne déclenchera pas une erreur «système». Cependant, cela peut parfois être une erreur de programmation.
Le choix entre le réveil d’un coprocessus ou de tous les coprocessus dépend du problème. Pour reprendre l’exemple de la file d’attente, si un coprocessus ajoute un élément dans une queue vide, il n’a pas besoin de réveiller tous les autres puisqu’un seul pourra effectivement retirer cet élément. Par contre, s’il ajoute un nombre d’éléments qui n’est pas connu statiquement ou bien qui est très grand, il doit demander le réveil de tous les coprocessus. Attention! si l’ajout d’un élément dans une queue non vide n’envoie pas de signal, alors l’ajout d’un élément dans une queue vide doit envoyer un broadcast, car il pourrait être immédiatement suivi d’un autre ajout (sans signal) donc se comporter comme un ajout multiple. En résumé, soit chaque ajout envoie un signal, soit seul l’ajout dans une queue vide envoie un broadcast. Le choix entre ces deux stratégies est un pari sur le fait que la queue est généralement vide (première solution) ou généralement non vide (deuxième solution).
Il est fréquent qu’un coprocessus n’ait qu’une approximation de la raison pour laquelle un autre peut être en attente sur une condition. Il va donc signaler sur cette condition par excès simplement lorsque la situation peut avoir évolué pour un coprocessus en attente. Un processus réveillé ne doit donc pas supposer que la situation a évolué pour lui et que la condition pour laquelle il est mis en attente est maintenant remplie. Il doit en général (presque impérativement) tester à nouveau la configuration, et si nécessaire se remettre en attente sur la condition. Il ne s’agit plus cette fois-ci d’une attente active, car cela ne peut se passer que si un coprocessus à émis un signal sur la condition.
Voici une autre raison qui justifie cette discipline: lorsqu’un coprocessus
vient de fournir une ressource en abondance et réveille tous les autres par
un broadcast
, rien n’empêche le premier réveillé d’être très gourmand et
d’épuiser la ressource en totalité. Le second réveillé devra se rendormir en
espérant être plus chanceux la prochaine fois.
Nous pouvons maintenant donner une solution concrète pour les queues
partagées. La structure de queue
définie dans le module Queue
est enrichie avec un verrou et une condition non_empty
.
type 'a t = { queue : 'a Queue.t; lock : Mutex.t; non_empty : Condition.t } let create () = { queue = Queue.create(); lock = Mutex.create(); non_empty = Condition.create() } |
L’ajout n’est jamais bloquant, mais il ne faut pas oublier de signaler la
condition non_empty
lorsque la liste était vide avant ajout, car il est
possible que quelqu’un soit en attente sur la condition.
let add e q = Mutex.lock q.lock; if Queue.length q.queue = 0 then Condition.broadcast q.non_empty; Queue.add e q.queue; Mutex.unlock q.lock;; |
Le retrait est un tout petit peu plus compliqué:
let take q = Mutex.lock q.lock; while Queue.length q.queue = 0 do Condition.wait q.non_empty q.lock done; let x = Queue.take q.queue in Mutex.unlock q.lock; x;; |
Après avoir acquis le verrou, on peut essayer de retirer l’élément de la
queue. Si la queue est vide, il faut se mettre en attente sur la condition
non_empty
. Au réveil, on essayera à nouveau, sachant qu’on a déjà le
verrou.
Comme expliqué ci-dessus, le signal Condition.broadcast q.non_empty
(8) est exécuté par un coprocessus p en étant en
possession du verrou q.lock
. Cela implique qu’un coprocessus lecteur q
exécutant la fonction take
ne peut pas se trouver entre la ligne
13 et 14 où il serait prêt à s’endormir après avoir
vérifié que la queue est vide mais ne l’aurait pas encore fait: dans ce cas,
le signal émis par p serait inopérant et ignoré, le coprocessus q
n’étant pas encore endormi et q s’endormirait et ne serait pas réveillé
par p ayant déjà émis son signal. Le verrou assure donc que soit q est
déjà endormi, soit q n’a pas encore testé l’état de la queue.
Les fonctions décrites dans cette section sont définies dans le module
Event
.
Verrous et conditions permettent ensemble d’exprimer toutes les formes de synchronisation. Toutefois, leur mise en œuvre n’est pas toujours aisée comme le montre l’exemple a priori simple des files d’attente dont le code de synchronisation s’avère a posteriori délicat.
La communication synchrone entre coprocessus par événements est un ensemble
de primitives de communication de plus haut niveau qui tend à faciliter
la programmation concurrente.
L’ensemble des
primitives de la bibliothèque Event
a été initialement développé par John
Reppy pour le langage Standard ML, appelé
Concurrent ML [14]. En OCaml, ces primitives sont
implantées au dessus de la synchronisation plus élémentaire par verrous et
conditions. Les primitives sont regroupées dans le module Event
.
La communication se fait en envoyant des événements au travers de
canaux. Les canaux sont en quelque sorte des “tuyaux légers”:
ils permettent de communiquer entre les coprocessus d’un même programme en
gérant eux-mêmes la synchronisation entre producteurs et consommateurs. Un
canal transportant des valeurs de type 'a
est de type 'a Event.channel
. Les
canaux sont homogènes et transportent donc toujours des valeurs du même
type. Un canal est créé avec la primitive new_channel.
val Event.new_channel : unit -> 'a channel |
L’envoi ou la réception d’un message ne se fait pas directement, mais par l’intermédiaire d’un événement. Un événement élémentaire est “envoyer un message” ou “recevoir un message”. On les construit à l’aide des primitives suivantes:
Le construction d’un message n’a pas d’effet immédiat et se limite à la création d’une structure de donnée décrivant l’action à effectuer. Pour réaliser un événement, le coprocessus doit se synchroniser avec un autre coprocessus souhaitant réaliser l’événement complémentaire. La primitive sync permet la synchronisation du coprocessus pour réaliser l’événement passé en argument.
val Event.sync : 'a event -> 'a |
Ainsi pour envoyer une valeur v sur le canal c, on pourra exécuter
sync
(send
c v). Le coprocessus est suspendu jusqu’à ce que
l’événement se réalise, c’est-à-dire jusqu’à ce qu’un autre coprocessus soit
prêt à recevoir de une valeur sur le canal c. De façon symétrique, un
processus peut se mettre en attente d’un message sur le canal c en
effectuant sync
(receive
c).
Il y a compétitions entre tous les producteurs d’une part et tous les consommateurs d’autre part. Par exemple, si plusieurs coprocessus essayent d’émettre un message sur un canal alors qu’un seul est prêt à le lire, il est un clair qu’un seul producteur réalisera l’événement. Les autres resteront suspendus, sans même s’apercevoir qu’un autre a été “servi” avant eux.
La compétition peut également se produire au sein d’un même coprocessus. Deux événements peuvent être combinés par la primitive choose:
val Event.choose : 'a event list -> 'a event |
L’événement résultant est une offre en parallèle des événements passés en
arguments qui se réalisera lorsque qu’un exactement de ceux-ci se réalisera.
Il bien faut distinguer là l’offre d’un événement de sa réalisation.
L’appel sync
(choose
e1 e2) se synchronise en offrant au choix
deux événements e1 et e2, mais un seul des deux événements sera
effectivement réalisé (l’offre de l’autre événement sera simultanément
annulée). La primitive wrap_abort permet à un événement d’agir
lorsqu’il
est ainsi annulé.
Event.wrap_abort : 'a event -> (unit -> unit) -> 'a event |
L’appel wrap_abort
e f construit un événement e′ équivalent à e
mais tel que si e n’est pas réalisé après sa synchronisation alors la
fonction f est exécutée (cela n’a d’intérêt que si e′ fait parti d’un
événement complexe).
Un coprocessus peut proposer de réaliser un événement sans se bloquer
(un peu à la manière de Mutex.try_lock
).
val Event.poll : 'a event -> 'a option |
L’appel Event.pool
e va offrir l’événement e mais si
celui-ci ne peut pas être immédiatement réalisé, il retire l’offre au lieu
de se bloquer et tout se passera comme s’il n’avait rien fait (ou plus
exactement comme si l’expression Event.pool
e avait été remplacée par
la valeur None
). Par contre, si l’événement peut se réaliser
immédiatement, alors tout se passera comme si le coprocessus avec effectué
Event.sync
e, sauf que c’est la valeur Some
v plutôt que v qui
est retournée.
Exemple:
Dans l’exemple 7.3 du crible d’Ératosthène la communication entre les différents coprocessus s’effectue par des tuyaux comme dans le programme d’origine, donc en utilisant la mémoire du système (le tuyau) comme intermédiaire. On peut penser qu’il serait plus efficace de communiquer directement en utilisant la mémoire du processus. Une solution simple consisterait à remplacer le tuyau par un canal sur lequel sont envoyés les entiers.
Passer des entiers dans le canal n’est pas suffisant car il faut aussi
pouvoir détecter la fin du flux. Le plus simple est donc de passer des
éléments de la forme
Some
n et de terminer en envoyant la valeur None
.
Pour simplifier et souligner les changements en les minimisant, nous allons
récupérer le code de l’exemple 5.2. Pour cela, nous
simulons les tuyaux et les fonctions de lecture/écriture sur un tuyau par
des canaux et de fonctions de lecture/écriture sur les canaux.
Il suffit de reprendre la version précédente du programme et de modifier les
fonctions d’entrée-sortie pour qu’elles lisent et écrivent dans un canal
plutôt que dans un tampon d’entrée-sortie de la bibliothèque Pervasives
, par
exemple en insérant à la ligne 2 le code suivant:
let pipe () = let c = Event.new_channel() in c, c let out_channel_of_descr x = x let in_channel_of_descr x = x let input_int chan = match Event.sync (Event.receive chan) with Some v -> v | None -> raise End_of_file let output_int chan x = Event.sync (Event.send chan (Some x)) let close_out chan = Event.sync (Event.send chan None);; |
Toutefois, si l’on compare l’efficacité de cette version avec la précédente, on trouve qu’elle est deux fois plus lente. La communication de chaque entier requiert une synchronisation entre deux coprocessus donc plusieurs appels systèmes pour prendre et relâcher le verrou. Au contraire, la communication par tuyau utilise des entrées/sorties temporisées qui permettent d’échanger plusieurs milliers d’entiers à chaque appel système.
Pour être juste, il faudrait donc également fournir une communication temporisée sur les canaux en utilisant le canal seulement pour échanger un paquet d’entiers. Le fils peut accumuler les résultats dans une queue qu’il est seul à posséder, donc dans laquelle il peut écrire sans se synchroniser. Quand la queue est pleine ou sur demande explicite, il la vide en se synchronisant sur le canal. Le père à lui même sa propre queue qu’il reçoit en se synchronisant et qu’il vide petit à petit.
Voici une solution (qui remplace des lignes 2–0 ci-dessus):
type 'a buffered = { c : 'a Queue.t Event.channel; mutable q : 'a Queue.t; size : int } let pipe () = let c = Event.new_channel() in c, c;; let size = 1024;; let out_channel_of_descr chan = { c = chan; q = Queue.create(); size = size };; let in_channel_of_descr = out_channel_of_descr;; let input_int chan = if Queue.length chan.q = 0 then begin let q = Event.sync (Event.receive chan.c) in if Queue.length q > 0 then chan.q <- q else raise End_of_file end; Queue.take chan.q;; let flush_out chan = if Queue.length chan.q > 0 then Event.sync (Event.send chan.c chan.q); chan.q <- Queue.create();; let output_int chan x = if Queue.length chan.q = size then flush_out chan; Queue.add x chan.q let close_out chan = flush_out chan; Event.sync (Event.send chan.c chan.q);; |
Cette version permet de retrouver une efficacité comparable à la version avec tuyau (mais pas meilleure).
Si l’on compare avec la version d’origine avec processus et tuyaux, il y a deux sources potentielles de gain: d’une part les coprocessus sont plus sobres et coûtent moins cher à leur lancement. D’autre part, la communication par le canal se contente de passer un pointeur sans recopie. Mais ces gains ne sont pas perceptibles ici, car le nombre de coprocessus créés et les structures échangées ne sont pas suffisamment grands devant le coût de l’appel système et devant les temps de calcul.
En conclusion, on peut retenir que la communication entre coprocessus a un coût qui peut aller jusqu’à celui d’un appel système (si le processus doit être suspendu) et que ce coût peut être significativement réduit en temporisant les communications pour communiquer moins souvent de plus grosses structures.
Pour éviter l’écroulement de la machine, on peut limiter le nombre de coprocessus à une valeur raisonnable au delà laquelle le coût de la gestion des tâches dépasse la latence du service des requêtes (temps passés à attendre des données sur le disque, etc.). Au delà, on pourra garder quelques connexions en attente d’être traitées, puis finalement refuser les connexions. Lorsque la charge diminue et que le nombre de coprocessus est au-delà de la valeur “idéale”, certains se laissent mourir, les autres restent prêts pour les prochaines requêtes.
Transformer l’exemple 7.5 pour arriver à cette architecture.
Le système Unix n’a pas été conçu au départ pour fournir du support pour les
coprocessus. Toutefois, la plupart des implémentations modernes d’Unix
offrent maintenant un tel support. Malgré tout, les coprocessus restent une
pièce rapportée qui est parfois apparente. Par exemple, dès l’utilisation de
coprocessus il est fortement déconseillé d’utiliser fork
autrement que
pour faire exec
immédiatement après. En effet, fork
copie le coprocessus
courant qui devient un processus estropié car s’exécutant en croyant avoir
des coprocessus qui en fait n’existent pas. Le père lui continue à
s’exécuter a priori normalement. Le cas particulier d’un appel à fork où le
fils lance immédiatement un autre programme ne pose quant à lui pas de
problème. Heureusement! car c’est le seul moyen de lancer d’autres
programmes.
Inversement, on peut faire fork
(non suivi de exec
) puis lancer ensuite
plusieurs coprocessus dans le fils et le père, sans aucun problème.
Lorsque le système d’exploitation sous-jacent possède des coprocessus, OCaml peut fournir une implémentation native des coprocessus, en laissant le plus possible leur gestion au système d’exploitation. Chaque coprocessus réside alors dans un processus Unix différent mais partageant le même espace mémoire.
Lorsque le système ne fournit pas de support pour les coprocessus, OCaml peut les émuler. Tous les coprocessus s’exécutent alors dans le même processus Unix et leur gestion y compris leur ordonnancement est effectuée par l’environnement d’exécution d’OCaml. Toutefois, cette implémentation n’est disponible qu’avec la compilation vers du bytecode.
Le système OCaml offre une même interface programmatique pour les versions native et simulée des coprocessus. L’implémentation des coprocessus est donc dédoublée. Une implémentation pour la version émulée qui comporte son propre contrôleur de tâches et une autre implémentation qui s’appuie sur les coprocessus POSIX (1003.1c) et relève les fonctions des bibliothèques correspondantes au niveau du langage OCaml. Au passage, le langage OCaml effectue quelques tâches administratives simples et assure une interface identique avec la version émulée. Cela garantit qu’un programme compilable sur une architecture Unix reste compilable sur une autre architecture Unix. Toutefois, le fait que les processus soient émulés ou natifs peut changer la synchronisation des appels à des bibliothèques C, et ainsi changer, malgré tout, la sémantique du programme. Il faut donc prendre quelques précautions avant de considérer qu’un programme se comportera de la même façon dans les deux versions. Dans ce chapitre, le discours vaut principalement pour les deux implantations, mais rapelons qu’à défaut, nous avons pris le point de vue d’une implantation native.
Pour utiliser les coprocessus émulés, il faut passer l’option -vmthreads
à
la place de -threads
au compilateur ocamlc
. Cette option n’est pas
acceptée par le compilateur ocamlopt
.
L’implémentation des coprocessus en OCaml doit faire face à une des
particularités du langage OCaml qui est la gestion automatique de la
mémoire et sa consommation importante de données allouées. La solution
retenue, qui est la plus simple et aussi généralement la plus efficace, est
de séqentialiser l’exécution du code OCaml dans tous les coprocessus: un
verrou de l’environnement d’exécution empêche deux coprocessus d’exécuter du
code OCaml simultanément. Cela semble contraire à l’idée même des
coprocessus, mais il n’en est rien. Car le verrou est évidemment
relâché avant les appels systèmes bloquants et repris au
retour. D’autres coprocessus peuvent donc prendre la main à ce moment
là. Un cas particulier d’appel système étant l’appel à sched_yield
effectué à intervalles de temps réguliers pour permettre de suspendre le
coprocessus en cours d’exécution et donner la main à d’autres.
Sur une machine multiprocesseur, la seule source de vrai parallélisme provient de l’exécution du code C et des appels systèmes. Sur une machine mono-processeur, le fait que le code OCaml soit séquentialisé n’est pas vraiment perceptible.
Le programmeur ne peut pas pour autant s’appuyer sur cette séquentialisation, car un coprocessus peut donner la main à un autre à peu près à n’importe quel moment. À une exception près, la séquentialisation garantit la cohérence de la mémoire: deux coprocessus ont toujours la même vision de la mémoire, sauf, peut-être, lorsqu’ils exécutent du code C. En effet, le passage du verrou implique une synchronisation de la mémoire: une lecture à une adresse effectuée par un coprocessus qui se situe dans le temps après une opération d’écriture à cette même adresse par un autre coprocessus retournera toujours la valeur fraîchement écrite, sans besoin de synchronisation supplémentaire.
D’une façon générale, l’utilisation des signaux est déjà délicate avec un seul coprocessus en raison de leur caractère asynchrone. Elle l’est encore plus en présence de plusieurs coprocessus car s’ajoute alors de nouvelles difficultés: à quel coprocessus doit être envoyé le signal? À tous, au principal, ou à celui en cours d’exécution? Que se passe-t-il si un coprocessus envoie un signal à un autre? En fait, les coprocessus ont été implantés avant de bien répondre à ces questions, et différentes implantations peuvent se comporter différemment par rapport aux signaux.
Les opérations Thread.join
, Mutex.lock
, and Condition.wait
, bien
qu’étant des appels systèmes longs, ne sont pas interruptibles par un
signal. (Elle ne peuvent donc pas échouer avec l’erreur EINTR
). Si un
signal est envoyé pendant l’attente, il sera reçu et traité lorsque l’appel
retournera.
La norme POSIX spécifie que le handler des signaux est partagé entre tous les coprocessus et au contraire le masque des signaux est propre à chaque coprocessus et hérité à la création d’un coprocessus. Mais le comportement des coprocessus vis-à-vis des signaux reste largement sous-spécifié et donc non portable.
Il est donc préférable d’éviter autant que possible l’utilisation de signaux
asynchrones (du type sigalrm
, sigvtalrm
, sigchld
, etc.) avec des
coprocessus. Ceux-ci peuvent être bloqués et consultés avec
Thread.wait_signal
. On peut dédier un coprocessus au traitement des
signaux: ne faisant que cela, il peut se mettre en attente sur la réception
de signaux et entreprendre les actions nécessaires et mettre à jour certaines
informations consultées par d’autres coprocessus.
De plus, les coprocessus OCaml (depuis la version 3.08) utilisent le
signal sigvtalarm
de façon interne pour effectuer la préemption des
coprocessus. Ce signal est donc réservé et ne doit pas être utilisé par le
programme lui-même, car il y a un risque d’interférence.