Previous Up Next

Chapter 7  Les coprocessus

Un coprocessus, aussi appelé processus léger et thread en anglais, est un fil d’exécution d’un programme pouvant s’exécuter en parallèle avec d’autres coprocessus du même programme.


Ce chapitre décrit les appels systèmes Unix permettant à un programme de créer des coprocessus et de les synchroniser à l’aide de verrous, de conditions ou d’événements. Il s’appuie sur la bibliothèque OCaml Thread et les bibliothèques Mutex, Condition. Il présente également la communication par événements synchrones fournie par la bibliothèque et Event.

7.1  Généralités

La création d’un coprocessus est très différente de l’opération fork qui crée une copie du processus courant (donc une copie du programme): les espaces mémoire du père et du fils sont totalement disjoints après l’exécution de fork et les processus ne peuvent communiquer que par un appel système (comme écrire ou lire dans un fichier ou dans un tuyau).

Au contraire, tous les coprocessus d’un même programme partagent le même espace d’adressage. Les seules données qui les différencient et qui ne sont pas partagées sont leur identité et leur pile d’exécution, ainsi que quelques informations pour le système (masque des signaux, état des verrous et conditions, etc.). De ce point de vue, les coprocessus ressemblent aux coroutines. Les coprocessus d’un même programme forment un groupe: ils sont tous traités de la même façon, sauf un, le coprocessus principal, qui a été créé au démarrage du programme et dont la terminaison entraîne l’arrêt de tous les autres coprocessus et l’arrêt du programme. Lorsque nous parlons de plusieurs coprocessus, nous considérons implicitement qu’ils s’agit des coprocessus d’un même programme.

À la différence des coroutines, qui ne peuvent pas s’exécuter en parallèle —l’une devant passer explicitement la main avant que l’autre ne puisse s’exécuter à son tour— les coprocessus s’exécutent en parallèle ou du moins tout se passe comme s’ils le faisaient, car ils peuvent être interrompus de façon préemptive par le système pour donner la main à d’autres coprocessus. De ce point de vue, les coprocessus ressemblent aux processus.

Le partage de l’espace mémoire permet aux coprocessus de communiquer directement entre eux par leur mémoire commune. Bien qu’en principe, deux processus n’aient pas besoin pour cela de passer par le système d’exploitation, le fait qu’ils s’exécutent en parallèle les oblige à se synchroniser au moment de la communication (l’un devant finir d’écrire avant que l’autre ne commence à lire), ce qui nécessite, en général, de passer par le système d’exploitation. La synchronisation entre coprocessus, quasiment indispensable pour toute communication, reste souvent une partie difficile de la programmation avec des coprocessus. Elle peut se faire à l’aide de verrous et de conditions, ou bien de façon plus évoluée, en communiquant par des événements.

Le gain à l’utilisation des coprocessus par rapport aux processus reste le coût moindre à leur création et la possibilité d’échanger de grosses structures de données sans recopie, simplement en s’échangeant un pointeur. Inversement, l’utilisation des coprocessus a un coût qui est de devoir bien gérer la synchronisation entre eux, y compris en cas d’erreur fatale dans l’un d’eux. En particulier un coprocesseur doit bien faire attention de libérer ses verrous et de préserver ses invariant avant de s’arrêter. Aussi, on pourra préférer les processus aux coprocessus lorsqu’on ne profite pas réellement des avantages de ces derniers.

Mise en œuvre en OCaml

Pour compiler une application utilisant des coprocessus natifs il faut faire:

ocamlc -thread unix.cma threads.cma -o prog mod1.ml mod2.ml mod3.ml
ocamlopt -thread unix.cmxa threads.cmxa -o prog mod1.ml mod2.ml mod3.ml

Si votre installation ne supporte pas les coprocessus natifs, vous pouvez voir à la section 7.8 ou dans le manuel comment utiliser les coprocessus simulés. Le discours et les exemples de ce chapitre supposent des coprocessus natifs et ne s’applique pas, en général, aux coprocessus simulés.

7.2  Création et terminaison des coprocessus

Les fonctions décrites dans cette section sont définies dans le module Thread.

L’appel système create f a crée un nouveau coprocessus dans lequel l’application de fonction f a est exécutée. La création d’un coprocessus retourne à l’appelant une poignée sur le coprocessus fraîchement créé qui peut être utilisée pour son contrôle.

     
   val Thread.create : ('a -> 'b) -> 'a -> t

Le calcul s’exécute de façon concurrente avec les calculs effectués par les autres coprocessus du programme. Le coprocessus termine lorsque l’application f a termine. Le résultat du calcul est simplement ignoré. Si le coprocessus termine par une exception non rattrapée, celle-ci n’est pas propagée dans le coprocessus appelant ou principal mais simplement ignorée après affichage d’un message dans la sortie d’erreur. (Les autres coprocessus ont a priori évolué indépendemment et ne seraient pas en mesure de recevoir l’exception.)

Un coprocessus peut aussi se terminer prématurément en appelant la fonction exit (de la bibliothèque Thread), à ne pas confondre avec la fonction de même nom de la bibliothèque Pervasives qui termine le programme tout entier, i.e. tous ses coprocessus. Le coprocessus principal d’un programme, qui est celui qui a lancé le premier d’autres coprocessus, exécute implicitement la fonction Pervasives.exit lorsqu’il termine.

Si un coprocessus termine avant le coprocessus principal, alors il est désalloué immédiatement, et ne devient pas un fantôme comme dans le cas d’un processus Unix créé par fork. La bibliothèque OCaml se charge de cette gestion.

Nous en savons déjà assez pour proposer une alternative au modèle du serveur concurrent par «fork» (ou «double fork») vu dans le cours précédent en faisant effectuer le traitement par un coprocessus plutôt que par un processus fils. Pour établir un tel serveur, il nous suffit de proposer une variante Misc.co_treatment de la fonction Misc.fork_treatment définie dans le chapitre 6.

     
   let co_treatment server_sock service (client_descr_ as client) =
     try ignore (Thread.create service client)
     with exn -> close client_descrraise exn;;

Si le coprocessus a pu être créé, le traitement est entièrement pris en compte par la fonction service y compris la fermeture de client_descr. Sinon, on ferme le descripteur client_descr (le client est abandonné), et on laisse le programme principal traiter l’erreur.

Attention, toute la difficulté du coserveur est cachée dans la fonction service qui doit traiter la connexion avec robustesse et jusqu’à la déconnexion. Dans le cas du serveur concurrent où le service est exécuté par un autre processus, une erreur fatale pendant le service conduisant à la mort prématurée du service produit par défaut le bon comportement qui est de fermer la connexion, car le système ferme les descripteurs à la mort d’un processus. Il n’en est plus de même dans le cas où le service est exécuté par un coprocessus, car les descripteurs entre les différents coprocessus sont par défaut partagés et ne sont pas fermés à la mort du coprocessus. C’est donc au coprocessus de fermer ses descripteurs avant de mourir. De plus, un coprocessus ne peut pas appeler Pervasives.exit dans le cas d’une erreur fatale dans le traitement d’un service, car celle-ci arrêterait non seulement le service mais aussi le serveur. Appelé Thread.exit n’est souvent pas non plus une solution, car on risque de ne pas avoir bien désalloué les ressources ouvertes, et en particulier la connexion. Une solution consiste à lever une exception signifiant un arrêt fatal (par exemple une exception Exit) provoquant l’exécution du code de finalisation à sa remontée. Pour des raisons analogues, il est essentiel que le signal sigpipe soit dans un traitement du service par coprocessus, remplaçant l’arrêt immédiat du coprocessus par la levée d’une exception EPIPE.

7.3  Mise en attente

Les fonctions décrites dans cette section sont définies dans le module Thread.

L’appel système join permet à un coprocessus d’en attendre un autre.

     
   val Thread.join : t -> unit

Le coprocessus appelant est alors suspendu jusqu’à ce que celui dont l’identité est passée en argument ait terminé son exécution. Cet appel peut également être utilisé par le coprocessus principal pour attendre que tous les autres aient retourné avant de terminer lui-même et de terminer le programme (le comportement par défaut étant de tuer les autres coprocessus sans attendre leur terminaison).

Bien que cet appel soit bloquant donc du type «long», il est relancé automatiquement à la réception d’un signal: il est effectivement interrompu par un signal, le handler est traité, puis l’appel est relancé. On ne revient donc de l’appel que quand le coprocessus à réellement terminé et l’appel ne doit jamais lever l’exception EINTR. Du point de vue du programmeur OCaml, cela se comporte comme si le signal était reçu au moment où l’appel retourne.

Un coprocessus ne retourne pas, car il s’exécute de façon asynchrone. Mais son action peut être observée — heureusement! — par ses effets de bords. Par exemple, un coprocessus peut placer le résultat d’un calcul dans une référence qu’un autre coprocessus ira consulter après s’être assuré de la terminaison du calcul. Nous illustrons cela dans l’exemple suivant.

     
   exception Exited
   type 'a result = Value of 'a | Exception of exn
   let eval f x = try Value (f xwith z -> Exception z
   let coexec (f : 'a -> 'b) (x : 'a) : unit -> 'b =
     let result = ref (Exception Exitedin
     let p = Thread.create (fun x -> result := eval f xx in
     function() ->
       match join p; !result with
       | Value v -> v
       | Exception exn -> raise exn;;
   
   let v1 = coexec succ 4 and v2 = coexec succ 5 in v1()+v2();;

Le système peut suspendre un coprocessus pour donner temporairement la main à un autre ou parce qu’il est en attente d’une ressource utilisée par un de ses coprocessus (verrous et conditions, par exemple) ou par un autre processus (descripteur de fichier, par exemple). Un coprocessus peut également se suspendre de sa propre initiative. La fonction yield permet à un coprocessus de redonner la main prématurément (sans attendre la préemption par le système).

     
   val Thread.yield : unit -> unit

C’est une indication pour le gestionnaire de coprocessus, mais son effet peut être nul, par exemple, si aucun coprocessus ne peut s’exécuter immédiatement. Le système peut donc décider de redonner la main au même coprocessus.

Inversement, il n’est pas nécessaire d’exécuter yield pour permettre à d’autres coprocessus de s’exécuter, car le système se réserve le droit d’exécuter lui-même la commande yield à tout moment. En fait, il exerce ce droit à intervalles de temps suffisamment rapprochés pour permettre à d’autres coprocessus de s’exécuter et donner l’illusion à l’utilisateur que les coprocessus s’exécutent en parallèle, même sur une machine mono-processeur.

Exemple:

On peut reprendre et modifier l’exemple 3.3 pour utiliser des coprocessus plutôt que des processus.

     
   let rec psearch k cond v =
     let n = Array.length v in
     let slice i = Array.sub v (i * k) (min k (n - i * k)) in
     let slices = Array.init (n/kslice in
     let found = ref false in
     let pcond v = if !found then Thread.exit(); cond v in
     let search v = if simple_search pcond v then found := true in
     let proc_list = Array.map (Thread.create searchslices in
     Array.iter Thread.join proc_list;
     !found;;

La fonction psearch k f v recherche avec k coprocessus en parallèle une occurrence dans le tableau satisfaisant la fonction f. La fonction pcond permet d’interrompre la recherche en cours dès qu’une réponse a été trouvée. Tous les coprocessus partagent la même référence found: ils peuvent donc y accéder de façon concurrente. Il n’y a pas de section critique entre les différents coprocessus car s’ils écrivent en parallèle dans cette ressource, ils écrivent la même valeur. Il est important que les coprocessus n’écrivent pas le résultat de la recherche quand celui-ci est faux! par exemple le remplacement de la ligne 7 par

     
   let search v = found := !found && simple_search pcond v

ou même:

     
   let search v = let r = simple_search pcond v in found := !found && r

serait incorrect.

la recherche en parallèle est intéressante même sur une machine mono-processeur si la comparaison des éléments peut être bloquée temporairement (par exemple par des accès disques ou, mieux, des connexions réseau). Dans ce cas, le coprocessus qui effectue la recherche passe la main à un autre et la machine peut donc continuer le calcul sur une autre partie du tableau et revenir au coprocessus bloqué lorsque sa ressource sera libérée.

L’accès à certains éléments peut avoir une latence importante, de l’ordre de la seconde s’il faut récupérer de l’information sur le réseau. Dans ce cas, la différence de comportement entre une recherche séquentielle et une recherche en parallèle devient flagrante.

Exercice 18   Paralléliser le tri rapide (quicksort) sur des tableaux.
(Voir le corrigé)

Les autres formes de suspension sont liées à des ressources du système d’exploitation. Un coprocessus peut se suspendre pendant un certain temps en appelant delay s. Une fois s secondes écoulées, il pourra être relancé.

     
   val Thread.delay : float -> unit

Cette primitive est fournie pour des raisons de portabilité avec les coprocessus simulés, mais Thread.delay t est simplement une abréviation pour ignore (Unix.select [] [] [] t). Cet appel, contrairement à Thread.join, n’est pas relancé lorsqu’il est interrompu par un signal.

Pour synchroniser un coprocessus avec une opération externe, on peut utiliser la commande select d’Unix. Bien entendu, celle-ci ne bloquera que le coprocessus appelant et non le programme tout entier. (Le module Thread redéfinit cette fonction, car dans le cas des coprocessus simulés, elle ne doit pas appeler directement cette du module Unix, ce qui bloquerait le programme tout entier, donc tous les coprocessus. Il faut donc utiliser Thread.select et non Unix.select, même si les deux sont équivalents dans le cas des coprocessus natifs.)

Exemple:

Pour faire fonctionnner le crible d’Ératosthène avec des coprocessus plutôt que par duplication de processus Unix, il suffit de remplacer les lignes 30–41 par

     
       let p = Thread.create filter (in_channel_of_descr fd_inin
       let output = out_channel_of_descr fd_out in
       try
         while true do
           let n = input_int input in
           if List.exists (fun m -> n mod m = 0) first_primes then ()
           else output_int output n
         done;
       with End_of_file ->
         close_out output;
         Thread.join p

et les lignes 46–52 par

     
     let k = Thread.create filter (in_channel_of_descr fd_inin
     let output = out_channel_of_descr fd_out in
     generate len output;
     close_out output;
     Thread.join k;;

Toutefois, il ne faut espérer aucun gain significatif sur cet exemple qui utilise peu de processus par rapport au temps de calcul.

7.4  Synchronisation entre coprocessus: les verrous

Les fonctions de cette section sont définies dans le module Mutex (pour Mutual exclusion, en anglais).

Nous avons évoqué ci-dessus un problème d’accès concurrent aux ressources mutables. En particulier, le scénario suivant illustre le problème d’accès aux ressources partagées. Considérons un compteur c et deux processus p et q, chacun incrémentant en parallèle le même compteur c.


     Temps 
Coprocessus p      lit k        écrit k+1  
Coprocessus q         lit k  écrit k+1     
Valeur de c      k  k  k+1  k+1  
Figure 7.1: Compétition pour l’accès à une ressource partagée.

Supposons le scénario décrit dans la Figure 7.1. Le coprocessus p lit la valeur du compteur c, puis donne la main à q. À son tour, q lit la valeur de c puis écrit la valeur k+1 dans c. Le processus p reprend la main et écrit la valeur k+1 dans c. La valeur finale de c est donc k+1 au lieu de k+2.

Ce problème classique peut être résolu en utilisant des verrous qui empêchent l’entrelacement arbitraire de p et q.

Les verrous sont des objets partagés par l’ensemble des coprocessus d’un même programme qui ne peuvent être possédés que par un seul coprocessus à la fois. Un verrou est créé par la fonction create.

     
   val Mutex.create : unit -> Mutex.t

Cette opération ne fait que construire le verrou mais ne le bloque pas. Pour prendre un verrou déjà existant, il faut appeler la fonction lock en lui passant le verrou en argument. Si le verrou est possédé par un autre processus, alors l’exécution du processus appelant est gelée jusqu’à ce que le verrou soit libéré. Sinon le verrou est libre et l’exécution continue en empêchant tout autre processus d’acquérir le verrou jusqu’à ce que celui-ci soit libéré. La libération d’un verrou doit se faire explicitement par le processus qui le possède, en appelant unlock.

     
   val Mutex.lock : Mutex.t -> unit
   val Mutex.unlock : Mutex.t -> unit

L’appel système Mutex.lock se comporte comme Thread.join vis-à-vis des signaux: si le coprocessus reçoit un signal pendant qu’il exécute Mutex.lock, le signal sera pris en compte (i.e. le runtime OCaml informé du déclanchement du signal), mais le coprocessus se remet en attente de telle façon que Mutex.lock ne retourne effectivement que lorsque le lock a effectivement été acquis et bien sûr Mutex.lock ne levera pas l’exception EINTR. Le traitement réel du signal par OCaml se fera seulement au retour de Mutex.lock.

On peut également tenter de prendre un verrou sans bloquer en appelant try_lock

     
   val Mutex.try_lock : Mutex.t -> bool

Cette fonction retourne true si le verrou a pu être pris (il était donc libre) et false sinon. Dans ce dernier cas, l’exécution n’est pas suspendue, puisque le verrou n’est pas pris. Le coprocessus peut donc faire autre chose et éventuellement revenir tenter sa chance plus tard.

Exemple:

Incrémenter un compteur global utilisé par plusieurs coprocessus pose un problème de synchronisation: les instants entre la lecture de la valeur du compteur et l’écriture de la valeur incrémentée sont dans une région critique, i.e. deux coprocessus ne doivent pas être en même temps dans cette région. La synchronisation peut facilement être gérée par un verrou.

     
   type counter = { lock : Mutex.tmutable counter : int }
   let newcounter() = { lock = Mutex.create(); counter = 0 }
   let addtocounter c k =
     Mutex.lock c.lock;
     c.counter <- c.counter + k;
     Mutex.unlock c.lock;;

La seule consultation du compteur ne pose pas de problème. Elle peut être effectuée en parallèle avec une modification du compteur, le résultat sera simplement la valeur du compteur juste avant ou juste après sa modification, les deux réponses étant cohérentes.

Un motif fréquent est la prise de verrou temporaire pendant un appel de fonction. Il faut bien sûr prendre soin de le relâcher à la fin de l’appel que l’appel ait réussi ou échoué. On peut abstraire ce comportement dans une fonction de bibliothèque:

     
   let run_with_lock l f x =
     Mutex.lock ltry_finalize f x Mutex.unlock l

Dans l’exemple précédent, on aurait ainsi pu écrire:

     
   let addtocounter c =
     Misc.run_with_lock c.lock (fun k -> c.counter <- c.counter + k)

Exemple:

Une alternative au modèle du serveur avec coprocessus est de lancer un nombre de coprocessus à l’avance qui traitent les requêtes en parallèle.

     
   val tcp_farm_server :
     int -> (file_descr -> file_descr * sockaddr -> 'a) -> sockaddr -> unit

La fonction tcp_farm_server se comporte comme tcp_server mais prend un argument supplémentaire qui est le nombre de coprocessus à lancer qui vont chacun devenir serveurs sur la même adresse. L’intérêt d’une ferme de coprocessus est de réduire le temps de traitement de chaque connexion en éliminant le temps de création d’un coprocessus pour ce traitement, puisque ceux-ci sont créés une fois pour toute.

     
   let tcp_farm_server n treat_connection addr =
     let server_sock = Misc.install_tcp_server_socket addr in
     let mutex = Mutex.create() in
     let rec serve () =
       let client =
         Misc.run_with_lock mutex
           (Misc.restart_on_EINTR acceptserver_sock in
       treat_connection server_sock client;
       serve () in
     for i = 1 to n-1 do ignore (Thread.create serve ()) done;
     serve ();;

La seule précaution à prendre est d’assurer l’exclusion mutuelle autour de accept afin qu’un seul des coprocessus n’accepte une connexion au même moment. L’idée est que la fonction treat_connection fasse un traitement séquentiel, mais ce n’est pas une obligation et on peut effectivement combiner une ferme de processus avec la création de nouveaux coprocessus, ce qui peut s’ajuster dynamiquement selon la charge du service.

La prise et le relâchement d’un verrou est une opération peu coûteuse lorsqu’elle réussit sans bloquer. Elle est en général implémentée par une seule instruction «test-and-set» que possèdent tous les processeurs modernes (plus d’autres petits coûts induits éventuels tels que la mise à jour des caches). Par contre, lorsque le verrou n’est pas disponible, le processus doit être suspendu et reprogrammé plus tard, ce qui induit alors un coût supplémentaire significatif. Il faut donc retenir que c’est la suspension réelle d’un processus pour donner la main à un autre et non sa suspension potentielle lors de la prise d’un verrou qui est pénalisante. En conséquence, on aura presque toujours intérêt à relâcher un verrou dès que possible pour le reprendre plus tard si nécessaire plutôt que d’éviter ces deux opérations, ce qui aurait pour effet d’agrandir la région critique et donc la fréquence avec laquelle un autre coprocessus se trouvera effectivement en compétition pour le verrou et dans l’obligation de se suspendre.

Les verrous réduisent l’entrelacement. En contrepartie, ils risquent de provoquer des situations d’interblocage. Par exemple, il y a interblocage si le coprocessus p attend un verrou v possédé par le coprocessus q qui lui-même attend un verrou u possédé par p. (Dans le pire des cas, un processus attend un verrou qu’il possède lui-même...) La programmation concurrente est difficile, et se prémunir contre les situations d’interblocage n’est pas toujours facile. Une façon simple et souvent possible d’éviter cette situation consiste à définir une hiérarchie entre les verrous et de s’assurer que l’ordre dans lequel on prendra dynamiquement les verrous respecte la hiérarchie: on ne prend jamais un verrou qui n’est pas dominé par tous les verrous que l’on possède déjà.

7.5  Exemple complet: relais HTTP

On se propose de reprendre le relais HTTP développé dans le chapitre précédent pour servir les requêtes par des coprocessus.

Intuitivement, il suffit de remplacer la fonction establish_server qui crée un processus clone par une fonction qui crée un coprocessus. Il faut cependant prendre quelques précautions... En effet, la contrepartie des coprocessus est qu’ils partagent tous le même espace mémoire. Il faut donc s’assurer que les coprocessus ne vont pas se “marcher sur les pieds” l’un écrasant ce que vient juste de faire l’autre. Cela se produit typiquement lorsque deux coprocessus modifient en parallèle une même structure mutable.

Dans le cas du serveur HTTP, il y a quelques changements à effectuer. Commençons par régler les problèmes d’accès aux ressources. La fonction proxy_service qui effectue le traitement de la connexion et décrite à la section 6.13 appelle, par l’intermédiaire des fonctions parse_host, parse_url et parse_request, la fonction regexp_match qui utilise la bibliothèque Str. Or, cette bibliothèque n’est pas ré-entrante (le résultat de la dernière recherche est mémorisé dans une variable globale). Cet exemple montre qu’il faut également se méfier des appels de fonctions qui sous un air bien innocent cachent des collisions potentielles. Dans ce cas, nous n’allons pas réécrire la bibliothèque Str mais simplement séquentialiser son utilisation. Il suffit (et il n’y a pas vraiment d’autres choix) de protéger les appels à cette bibliothèque par des verrous. Il faut quand même prendre la précaution de bien libérer le verrou au retour de la fonction, pour un retour anormal par une levée d’exception.

Pour modifier au minimum le code existant, il suffit de renommer la définition de regexp_match du module Url en unsafe_regexp_match puis de définir regexp_match comme une version protégée de unsafe_regexp_match.

     
   let strlock = Mutex.create();;
   let regexp_match r string =
     Misc.run_with_lock strlock (unsafe_regexp_match rstring;;

La modification à l’air minime. Il faut toutefois remarquer que la fonction regexp_match regroupe à la fois le filtrage de l’expression et l’extraction de la chaîne filtrée. Il eût bien sûr été incorrect de protéger individuellement les fonctions Str.string_match et Str.matched_group.

Une autre solution serait de réécrire les fonctions d’analyse sans utiliser la bibliothèque Str. Mais il n’y a pas de raison pour un tel choix tant que la synchronisation des primitives de la bibliothèque reste simplement réalisable et ne s’avère pas être une source d’inefficacité. Évidemment, une meilleure solution serait que la bibliothèque Str soit ré-entrante en premier lieu.

Les autres fonctions appelées sont bien réentrantes, notamment la fonction Misc.retransmit qui alloue des tampons différents pour chaque appel.

Cependant, il reste encore quelques précautions à prendre vis à vis du traitement des erreurs. Le traitement d’une connexion par coprocessus doit être robuste comme expliqué ci-dessus. En particulier, en cas d’erreur, il ne faut pas que les autres processus soit affectés, c’est-à-dire que le coprocessus doit terminer «normalement» en fermant proprement la connexion en cause et se remettre en attente d’autres connexions. Nous devons d’abord remplacer l’appel à exit dans handle_error car il ne faut surtout pas tuer le processus en cours. Un appel à Thread.exit ne serait pas correct non plus, car la mort d’un coprocessus ne ferme pas ses descripteurs (partagés), comme le ferait le système à la mort d’un processus. Une erreur dans le traitement d’une connexion laisserait alors la connexion ouverte. La solution consiste à lever une exception Exit qui permet au code de finalisation de faire ce qu’il faut. Nous devons maintenant protéger treat_connection pour rattraper toutes les erreurs: notamment Exit mais aussi EPIPE qui peut être levée si le client ferme prématurément sa connexion. Nous ferons effectuer ce traitement par une fonction de protection.

     
   let allow_connection_errors f s =
     try f s with Exit | Unix_error(EPIPE,_,_) -> ()
     
     let treat_connection s =
       Misc.co_treatment s (allow_connection_errors proxy_servicein
Exercice 19   Réécrire la version du proxy pour le protocole HTTP/1.1 à l’aide de coprocessus.

7.6  Les conditions

Les fonctions décrites dans cette section sont définies dans le module Condition.

Le mécanisme de synchronisation par verrous est très simple, mais il n’est pas suffisant: les verrous permettent d’attendre qu’une donnée partagée soit libre, mais il ne permettent pas d’attendre la forme d’une donnée. Remplaçons l’exemple du compteur par une file d’attente (premier entré/premier sorti) partagée entre plusieurs coprocessus. L’ajout d’une valeur dans la queue peut être synchronisé en utilisant un verrou comme ci-dessus, car quelque soit la forme de la queue, on peut toujours y ajouter un élément. Mais qu’en est-il du retrait d’une valeur de la queue? Que faut-il faire lorsque la queue est vide? On ne peut pas garder le verrou en attendant que la queue se remplisse, car cela empêcherait justement un autre coprocessus de remplir la queue. Il faut donc le rendre. Mais comment savoir quand la queue ne sera plus vide, sinon qu’en testant à nouveau périodiquement? Cette solution appelée “attente active” n’est bien sûr pas satisfaisante. Ou bien elle consomme du temps de calcul inutilement (période trop courte) ou bien elle n’est pas assez réactive (période trop longue).

Les conditions permettent de résoudre ce problème. Les conditions sont des objets sur lequel un coprocessus qui possède un verrou peut se mettre en attente jusqu’à ce qu’un autre coprocessus envoie un signal sur cette condition.

Comme les verrous, les conditions sont des structures passives qui peuvent être manipulées par des fonctions de synchronisation. On peut les créer par la fonction create.

     
   val Condition.create : unit -> Condition.t

Un processus p qui possède déjà un verrou v peut se mettre en attente sur une condition c et le verrou v en appelant wait  c  v. Le processus p informe le système qu’il est en attente sur la condition c et le verrou v puis libère le verrou v et s’endort. Il ne sera réveillé par le système que lorsqu’un autre processus q aura signalé un changement sur la condition c et lorsque le verrou v sera disponible; le processus p tient alors à nouveau le verrou v.

     
   val Condition.wait : Condition.t -> Mutex.t -> unit

Attention! c’est une erreur d’appeler Condition.wait  c  v sans être en possession du verrou v. Le comportement de Condition.wait vis à vis des signaux est le même que pour Mutex.lock.

Lorsqu’un coprocessus signale un changement sur une condition, il peut demander ou bien que tous les coprocessus en attente sur cette condition soient réveillés (broadcast), ou bien qu’un seul parmi ceux-ci soit réveillés (signal).

     
   val Condition.signal : Condition.t -> unit
   val Condition.broadcast : Condition.t -> unit

L’envoi d’un signal ou d’un broadcast sur une condition ne nécessite pas d’être en possession d’un verrou (à la différence de l’attente), dans le sens où cela ne déclenchera pas une erreur «système». Cependant, cela peut parfois être une erreur de programmation.

Le choix entre le réveil d’un coprocessus ou de tous les coprocessus dépend du problème. Pour reprendre l’exemple de la file d’attente, si un coprocessus ajoute un élément dans une queue vide, il n’a pas besoin de réveiller tous les autres puisqu’un seul pourra effectivement retirer cet élément. Par contre, s’il ajoute un nombre d’éléments qui n’est pas connu statiquement ou bien qui est très grand, il doit demander le réveil de tous les coprocessus. Attention! si l’ajout d’un élément dans une queue non vide n’envoie pas de signal, alors l’ajout d’un élément dans une queue vide doit envoyer un broadcast, car il pourrait être immédiatement suivi d’un autre ajout (sans signal) donc se comporter comme un ajout multiple. En résumé, soit chaque ajout envoie un signal, soit seul l’ajout dans une queue vide envoie un broadcast. Le choix entre ces deux stratégies est un pari sur le fait que la queue est généralement vide (première solution) ou généralement non vide (deuxième solution).

Il est fréquent qu’un coprocessus n’ait qu’une approximation de la raison pour laquelle un autre peut être en attente sur une condition. Il va donc signaler sur cette condition par excès simplement lorsque la situation peut avoir évolué pour un coprocessus en attente. Un processus réveillé ne doit donc pas supposer que la situation a évolué pour lui et que la condition pour laquelle il est mis en attente est maintenant remplie. Il doit en général (presque impérativement) tester à nouveau la configuration, et si nécessaire se remettre en attente sur la condition. Il ne s’agit plus cette fois-ci d’une attente active, car cela ne peut se passer que si un coprocessus à émis un signal sur la condition.

Voici une autre raison qui justifie cette discipline: lorsqu’un coprocessus vient de fournir une ressource en abondance et réveille tous les autres par un broadcast, rien n’empêche le premier réveillé d’être très gourmand et d’épuiser la ressource en totalité. Le second réveillé devra se rendormir en espérant être plus chanceux la prochaine fois.

Nous pouvons maintenant donner une solution concrète pour les queues partagées. La structure de queue définie dans le module Queue est enrichie avec un verrou et une condition non_empty.

     
   type 'a t =
     { queue : 'a Queue.tlock : Mutex.tnon_empty : Condition.t }
   let create () =
     { queue = Queue.create();
       lock = Mutex.create(); non_empty = Condition.create() }

L’ajout n’est jamais bloquant, mais il ne faut pas oublier de signaler la condition non_empty lorsque la liste était vide avant ajout, car il est possible que quelqu’un soit en attente sur la condition.

     
   let add e q =
     Mutex.lock q.lock;
     if Queue.length q.queue = 0 then Condition.broadcast q.non_empty;
     Queue.add e q.queue;
     Mutex.unlock q.lock;;

Le retrait est un tout petit peu plus compliqué:

     
   let take q =
     Mutex.lock q.lock;
     while Queue.length q.queue = 0
     do Condition.wait q.non_empty q.lock done;
     let x = Queue.take q.queue in
     Mutex.unlock q.lockx;;

Après avoir acquis le verrou, on peut essayer de retirer l’élément de la queue. Si la queue est vide, il faut se mettre en attente sur la condition non_empty. Au réveil, on essayera à nouveau, sachant qu’on a déjà le verrou.

Comme expliqué ci-dessus, le signal Condition.broadcast q.non_empty (8) est exécuté par un coprocessus p en étant en possession du verrou q.lock. Cela implique qu’un coprocessus lecteur q exécutant la fonction take ne peut pas se trouver entre la ligne 13 et 14 où il serait prêt à s’endormir après avoir vérifié que la queue est vide mais ne l’aurait pas encore fait: dans ce cas, le signal émis par p serait inopérant et ignoré, le coprocessus q n’étant pas encore endormi et q s’endormirait et ne serait pas réveillé par p ayant déjà émis son signal. Le verrou assure donc que soit q est déjà endormi, soit q n’a pas encore testé l’état de la queue.

Exercice 20   Implémenter une variante dans laquelle la queue est bornée: l’ajout dans la queue devient bloquant lorsque la taille de la queue a atteint une valeur fixée à l’avance. (Dans un monde concurrent, on peut avoir besoin de ce schéma pour éviter qu’un producteur produise sans fin pendant que le consommateur est bloqué.
(Voir le corrigé)

7.7  Communication synchrone entre coprocessus par événements

Les fonctions décrites dans cette section sont définies dans le module Event.

Verrous et conditions permettent ensemble d’exprimer toutes les formes de synchronisation. Toutefois, leur mise en œuvre n’est pas toujours aisée comme le montre l’exemple a priori simple des files d’attente dont le code de synchronisation s’avère a posteriori délicat.

La communication synchrone entre coprocessus par événements est un ensemble de primitives de communication de plus haut niveau qui tend à faciliter la programmation concurrente. L’ensemble des primitives de la bibliothèque Event a été initialement développé par John Reppy pour le langage Standard ML, appelé Concurrent ML [14]. En OCaml, ces primitives sont implantées au dessus de la synchronisation plus élémentaire par verrous et conditions. Les primitives sont regroupées dans le module Event.

La communication se fait en envoyant des événements au travers de canaux. Les canaux sont en quelque sorte des “tuyaux légers”: ils permettent de communiquer entre les coprocessus d’un même programme en gérant eux-mêmes la synchronisation entre producteurs et consommateurs. Un canal transportant des valeurs de type 'a est de type 'a Event.channel. Les canaux sont homogènes et transportent donc toujours des valeurs du même type. Un canal est créé avec la primitive new_channel.

     
   val Event.new_channel : unit -> 'a channel

L’envoi ou la réception d’un message ne se fait pas directement, mais par l’intermédiaire d’un événement. Un événement élémentaire est “envoyer un message” ou “recevoir un message”. On les construit à l’aide des primitives suivantes:

     
   val Event.send : 'a channel -> 'a -> unit event
   val Event.receive : 'a channel -> 'a event

Le construction d’un message n’a pas d’effet immédiat et se limite à la création d’une structure de donnée décrivant l’action à effectuer. Pour réaliser un événement, le coprocessus doit se synchroniser avec un autre coprocessus souhaitant réaliser l’événement complémentaire. La primitive sync permet la synchronisation du coprocessus pour réaliser l’événement passé en argument.

     
   val Event.sync : 'a event -> 'a

Ainsi pour envoyer une valeur v sur le canal c, on pourra exécuter sync (send c v). Le coprocessus est suspendu jusqu’à ce que l’événement se réalise, c’est-à-dire jusqu’à ce qu’un autre coprocessus soit prêt à recevoir de une valeur sur le canal c. De façon symétrique, un processus peut se mettre en attente d’un message sur le canal c en effectuant sync (receive c).

Il y a compétitions entre tous les producteurs d’une part et tous les consommateurs d’autre part. Par exemple, si plusieurs coprocessus essayent d’émettre un message sur un canal alors qu’un seul est prêt à le lire, il est un clair qu’un seul producteur réalisera l’événement. Les autres resteront suspendus, sans même s’apercevoir qu’un autre a été “servi” avant eux.

La compétition peut également se produire au sein d’un même coprocessus. Deux événements peuvent être combinés par la primitive choose:

     
   val Event.choose : 'a event list -> 'a event

L’événement résultant est une offre en parallèle des événements passés en arguments qui se réalisera lorsque qu’un exactement de ceux-ci se réalisera. Il bien faut distinguer là l’offre d’un événement de sa réalisation. L’appel sync  (choose  e1  e2) se synchronise en offrant au choix deux événements e1 et e2, mais un seul des deux événements sera effectivement réalisé (l’offre de l’autre événement sera simultanément annulée). La primitive wrap_abort permet à un événement d’agir lorsqu’il est ainsi annulé.

     
   Event.wrap_abort : 'a event -> (unit -> unit) -> 'a event

L’appel wrap_abort  e  f construit un événement e′ équivalent à e mais tel que si e n’est pas réalisé après sa synchronisation alors la fonction f est exécutée (cela n’a d’intérêt que si e′ fait parti d’un événement complexe).

Un coprocessus peut proposer de réaliser un événement sans se bloquer (un peu à la manière de Mutex.try_lock).

     
   val Event.poll : 'a event -> 'a option

L’appel Event.pool  e va offrir l’événement e mais si celui-ci ne peut pas être immédiatement réalisé, il retire l’offre au lieu de se bloquer et tout se passera comme s’il n’avait rien fait (ou plus exactement comme si l’expression Event.pool  e avait été remplacée par la valeur None). Par contre, si l’événement peut se réaliser immédiatement, alors tout se passera comme si le coprocessus avec effectué Event.sync  e, sauf que c’est la valeur Some  v plutôt que v qui est retournée.

Exemple:

Dans l’exemple 7.3 du crible d’Ératosthène la communication entre les différents coprocessus s’effectue par des tuyaux comme dans le programme d’origine, donc en utilisant la mémoire du système (le tuyau) comme intermédiaire. On peut penser qu’il serait plus efficace de communiquer directement en utilisant la mémoire du processus. Une solution simple consisterait à remplacer le tuyau par un canal sur lequel sont envoyés les entiers.

Passer des entiers dans le canal n’est pas suffisant car il faut aussi pouvoir détecter la fin du flux. Le plus simple est donc de passer des éléments de la forme Some  n et de terminer en envoyant la valeur None. Pour simplifier et souligner les changements en les minimisant, nous allons récupérer le code de l’exemple 5.2. Pour cela, nous simulons les tuyaux et les fonctions de lecture/écriture sur un tuyau par des canaux et de fonctions de lecture/écriture sur les canaux.

Il suffit de reprendre la version précédente du programme et de modifier les fonctions d’entrée-sortie pour qu’elles lisent et écrivent dans un canal plutôt que dans un tampon d’entrée-sortie de la bibliothèque Pervasives, par exemple en insérant à la ligne 2 le code suivant:

     
   let pipe () = let c = Event.new_channel() in cc
   let out_channel_of_descr x = x
   let in_channel_of_descr x = x
   
   let input_int chan =
     match Event.sync (Event.receive chanwith
       Some v -> v
     | None -> raise End_of_file
   let output_int chan x = Event.sync (Event.send chan (Some x))
   let close_out chan = Event.sync (Event.send chan None);;

Toutefois, si l’on compare l’efficacité de cette version avec la précédente, on trouve qu’elle est deux fois plus lente. La communication de chaque entier requiert une synchronisation entre deux coprocessus donc plusieurs appels systèmes pour prendre et relâcher le verrou. Au contraire, la communication par tuyau utilise des entrées/sorties temporisées qui permettent d’échanger plusieurs milliers d’entiers à chaque appel système.

Pour être juste, il faudrait donc également fournir une communication temporisée sur les canaux en utilisant le canal seulement pour échanger un paquet d’entiers. Le fils peut accumuler les résultats dans une queue qu’il est seul à posséder, donc dans laquelle il peut écrire sans se synchroniser. Quand la queue est pleine ou sur demande explicite, il la vide en se synchronisant sur le canal. Le père à lui même sa propre queue qu’il reçoit en se synchronisant et qu’il vide petit à petit.

Voici une solution (qui remplace des lignes 2–0 ci-dessus):

     
   type 'a buffered =
       { c : 'a Queue.t Event.channelmutable q : 'a Queue.tsize : int }
   let pipe () = let c = Event.new_channel() in cc;;
   
   let size = 1024;;
   let out_channel_of_descr chan =
     { c = chanq = Queue.create(); size = size };;
   let in_channel_of_descr = out_channel_of_descr;;
   
   let input_int chan =
     if Queue.length chan.q = 0 then begin
       let q = Event.sync (Event.receive chan.cin
       if Queue.length q > 0 then chan.q <- q
       else raise End_of_file
     end;
     Queue.take chan.q;;
   
   let flush_out chan =
     if Queue.length chan.q > 0 then Event.sync (Event.send chan.c chan.q);
     chan.q <- Queue.create();;
   
   let output_int chan x =
     if Queue.length chan.q = size then flush_out chan;
     Queue.add x chan.q
   
   let close_out chan =
     flush_out chan;
     Event.sync (Event.send chan.c chan.q);;

Cette version permet de retrouver une efficacité comparable à la version avec tuyau (mais pas meilleure).

Si l’on compare avec la version d’origine avec processus et tuyaux, il y a deux sources potentielles de gain: d’une part les coprocessus sont plus sobres et coûtent moins cher à leur lancement. D’autre part, la communication par le canal se contente de passer un pointeur sans recopie. Mais ces gains ne sont pas perceptibles ici, car le nombre de coprocessus créés et les structures échangées ne sont pas suffisamment grands devant le coût de l’appel système et devant les temps de calcul.

En conclusion, on peut retenir que la communication entre coprocessus a un coût qui peut aller jusqu’à celui d’un appel système (si le processus doit être suspendu) et que ce coût peut être significativement réduit en temporisant les communications pour communiquer moins souvent de plus grosses structures.

Exercice 21   Un serveur HTTP peut être soumis à une charge élevée et par à-coups. Pour améliorer le temps de réponse, on peut raffiner l’architecture d’un serveur HTTP en gardant toujours une dizaine de coprocessus prêts à traiter de nouvelles requêtes. Cela veut dire qu’un coprocessus ne traite plus une seule requête, mais une suite potentiellement infinie de requêtes qu’il lit dans une file d’attente.

Pour éviter l’écroulement de la machine, on peut limiter le nombre de coprocessus à une valeur raisonnable au delà laquelle le coût de la gestion des tâches dépasse la latence du service des requêtes (temps passés à attendre des données sur le disque, etc.). Au delà, on pourra garder quelques connexions en attente d’être traitées, puis finalement refuser les connexions. Lorsque la charge diminue et que le nombre de coprocessus est au-delà de la valeur “idéale”, certains se laissent mourir, les autres restent prêts pour les prochaines requêtes.

Transformer l’exemple 7.5 pour arriver à cette architecture.

7.8  Quelques détails d’implémentation

Implémentation des coprocessus en Unix

Le système Unix n’a pas été conçu au départ pour fournir du support pour les coprocessus. Toutefois, la plupart des implémentations modernes d’Unix offrent maintenant un tel support. Malgré tout, les coprocessus restent une pièce rapportée qui est parfois apparente. Par exemple, dès l’utilisation de coprocessus il est fortement déconseillé d’utiliser fork autrement que pour faire exec immédiatement après. En effet, fork copie le coprocessus courant qui devient un processus estropié car s’exécutant en croyant avoir des coprocessus qui en fait n’existent pas. Le père lui continue à s’exécuter a priori normalement. Le cas particulier d’un appel à fork où le fils lance immédiatement un autre programme ne pose quant à lui pas de problème. Heureusement! car c’est le seul moyen de lancer d’autres programmes.

Inversement, on peut faire fork (non suivi de exec) puis lancer ensuite plusieurs coprocessus dans le fils et le père, sans aucun problème.

Implémentation native et implémentation simulée en OCaml

Lorsque le système d’exploitation sous-jacent possède des coprocessus, OCaml peut fournir une implémentation native des coprocessus, en laissant le plus possible leur gestion au système d’exploitation. Chaque coprocessus réside alors dans un processus Unix différent mais partageant le même espace mémoire.

Lorsque le système ne fournit pas de support pour les coprocessus, OCaml peut les émuler. Tous les coprocessus s’exécutent alors dans le même processus Unix et leur gestion y compris leur ordonnancement est effectuée par l’environnement d’exécution d’OCaml. Toutefois, cette implémentation n’est disponible qu’avec la compilation vers du bytecode.

Le système OCaml offre une même interface programmatique pour les versions native et simulée des coprocessus. L’implémentation des coprocessus est donc dédoublée. Une implémentation pour la version émulée qui comporte son propre contrôleur de tâches et une autre implémentation qui s’appuie sur les coprocessus POSIX (1003.1c) et relève les fonctions des bibliothèques correspondantes au niveau du langage OCaml. Au passage, le langage OCaml effectue quelques tâches administratives simples et assure une interface identique avec la version émulée. Cela garantit qu’un programme compilable sur une architecture Unix reste compilable sur une autre architecture Unix. Toutefois, le fait que les processus soient émulés ou natifs peut changer la synchronisation des appels à des bibliothèques C, et ainsi changer, malgré tout, la sémantique du programme. Il faut donc prendre quelques précautions avant de considérer qu’un programme se comportera de la même façon dans les deux versions. Dans ce chapitre, le discours vaut principalement pour les deux implantations, mais rapelons qu’à défaut, nous avons pris le point de vue d’une implantation native.

Pour utiliser les coprocessus émulés, il faut passer l’option -vmthreads à la place de -threads au compilateur ocamlc. Cette option n’est pas acceptée par le compilateur ocamlopt.

Séquentialisation du code OCaml

L’implémentation des coprocessus en OCaml doit faire face à une des particularités du langage OCaml qui est la gestion automatique de la mémoire et sa consommation importante de données allouées. La solution retenue, qui est la plus simple et aussi généralement la plus efficace, est de séqentialiser l’exécution du code OCaml dans tous les coprocessus: un verrou de l’environnement d’exécution empêche deux coprocessus d’exécuter du code OCaml simultanément. Cela semble contraire à l’idée même des coprocessus, mais il n’en est rien. Car le verrou est évidemment relâché avant les appels systèmes bloquants et repris au retour. D’autres coprocessus peuvent donc prendre la main à ce moment là. Un cas particulier d’appel système étant l’appel à sched_yield effectué à intervalles de temps réguliers pour permettre de suspendre le coprocessus en cours d’exécution et donner la main à d’autres.

Sur une machine multiprocesseur, la seule source de vrai parallélisme provient de l’exécution du code C et des appels systèmes. Sur une machine mono-processeur, le fait que le code OCaml soit séquentialisé n’est pas vraiment perceptible.

Le programmeur ne peut pas pour autant s’appuyer sur cette séquentialisation, car un coprocessus peut donner la main à un autre à peu près à n’importe quel moment. À une exception près, la séquentialisation garantit la cohérence de la mémoire: deux coprocessus ont toujours la même vision de la mémoire, sauf, peut-être, lorsqu’ils exécutent du code C. En effet, le passage du verrou implique une synchronisation de la mémoire: une lecture à une adresse effectuée par un coprocessus qui se situe dans le temps après une opération d’écriture à cette même adresse par un autre coprocessus retournera toujours la valeur fraîchement écrite, sans besoin de synchronisation supplémentaire.

Coprocessus et signaux

D’une façon générale, l’utilisation des signaux est déjà délicate avec un seul coprocessus en raison de leur caractère asynchrone. Elle l’est encore plus en présence de plusieurs coprocessus car s’ajoute alors de nouvelles difficultés: à quel coprocessus doit être envoyé le signal? À tous, au principal, ou à celui en cours d’exécution? Que se passe-t-il si un coprocessus envoie un signal à un autre? En fait, les coprocessus ont été implantés avant de bien répondre à ces questions, et différentes implantations peuvent se comporter différemment par rapport aux signaux.

Les opérations Thread.join, Mutex.lock, and Condition.wait, bien qu’étant des appels systèmes longs, ne sont pas interruptibles par un signal. (Elle ne peuvent donc pas échouer avec l’erreur EINTR). Si un signal est envoyé pendant l’attente, il sera reçu et traité lorsque l’appel retournera.

La norme POSIX spécifie que le handler des signaux est partagé entre tous les coprocessus et au contraire le masque des signaux est propre à chaque coprocessus et hérité à la création d’un coprocessus. Mais le comportement des coprocessus vis-à-vis des signaux reste largement sous-spécifié et donc non portable.

Il est donc préférable d’éviter autant que possible l’utilisation de signaux asynchrones (du type sigalrm, sigvtalrm, sigchld, etc.) avec des coprocessus. Ceux-ci peuvent être bloqués et consultés avec Thread.wait_signal. On peut dédier un coprocessus au traitement des signaux: ne faisant que cela, il peut se mettre en attente sur la réception de signaux et entreprendre les actions nécessaires et mettre à jour certaines informations consultées par d’autres coprocessus.

De plus, les coprocessus OCaml (depuis la version 3.08) utilisent le signal sigvtalarm de façon interne pour effectuer la préemption des coprocessus. Ce signal est donc réservé et ne doit pas être utilisé par le programme lui-même, car il y a un risque d’interférence.


Previous Up Next