Les bons tuyaux de core.async
Clojure est le langage le plus simple et le plus cohérent que je connaisse. Le coeur du langage est très concis et fournit pourtant tous les outils nécessaires à l’écriture efficace de programmes modernes : manipulation de données, et multithreading.
core.async est une librairie qui introduit des outils très simples conceptuellement qui offrent des possibilités incroyables. Elle fait partie du projet clojure mais est facultative.
core.async
est similaire à ce que propose Go avec ses goroutines et channels. Ces concepts étant eux-même inspirés de l’algèbre CSP.
L’objectif de cet article est de vous montrer quelques possibilités offertes par core.async
en introduisant petit à petit les concepts. Nous verrons comment core.async
permet de découpler son code en producteur / consommateur d’information, comment elle permet d’encapsuler un traitement dans une unité de travail, comment elle permet de paralléliser son code sans contrainte, et comment elle ouvre les portes à la programmation réactive.
Exemples
La plupart des exemples tournent autours du jeu de Morra qui est une espèce de shi-fu-mi antique. Deux joueurs doivent en même temps produire un nombre avec leurs doigts et crier la somme des doigts des deux joueurs. Le gagnant est celui qui a bien deviné la somme.
Pour taper vous-même les exemples dans un repl, les imports suivants sont suffisants :
(require '[ clojure.core.async :as async :refer [thread chan <!! <! >!! >! go go-loop]])
Channels
L’élément central de core.async est le channel. Il s’agit d’un tuyau, d’un canal, conceptuellement proche d’une queue dans un outils de messaging.
Un channel se crée simplement :
(chan)
Un tel channel ne peut contenir qu’une seule valeur. Cela signifie que l’écriture dans ce channel est bloquée tant que personne ne lit la valeur présente.
Un channel peut être bufferisé pour contenir plusieurs valeurs . Ici on crée un channel qui peut contenir 2 valeurs :
(chan 2)
Un tel channel permet d’écrire consécutivement 2 valeurs sans être obligé d’attendre que quelqu’un ne lise les valeurs.
Je conseille de travailler avec des channels non bufferisés dans un premier temps. Cela rend le raisonnement et la compréhension du comportement plus aisés.
Un channel est une valeur comme une autre. Ici on l’associe à une Var :
(def player1 (chan 1))
On pourrait donc associer le channel player1
dans une Map
en tant que clé ou valeur par exemple. Et si on est vicieux, on peut même le faire transiter dans un autre channel.
Inversion Of Control
L’IoC, est un concept assez général dont le but est de casser les dépendances qu’un module a avec un autre. L’injection de dépendance permet de faire de l’IOC (voir Guice, Spring, …). Mais la programmation évênementiel, dans le navigateur, où l’on bind un évênement à une callback est aussi une forme d’IOC, dans le sens où le flot d’exécution n’est plus programmé séquentiellement. Quand je dis que core.async
permet de faire de l’IOC, c’est plus dans ce sens.
Traditionellement
Dans les langages fonctionnels, l’IoC par injection de dépendance est assez naturelle puisqu’on peut passer des fonctions en paramètre (fonctions de 1er ordre) :
(defn print-claim [m] (print m))
(defn player-claim [fingers sum handler]
(handler [fingers sum]))
(player-claim 5 8 print-claim)
Ceci affiche 5 8
dans la console. Le comportement de la fonction player-claim
a été injecté.
Avec core.async
Avec core.async
, on part d’un channel non bufferisé :
(def player1 (chan))
Pour écrire dans un channel, on utilise la fonction bloquante >!!
(>!! player1 [5 8])
A ce stade, player1
contient le vecteur [5 8]
. Par contre l’exécution du programme est bloquée jusqu’à ce que la valeur du channel soit lue. Il existe bien une opération de lecture <!!
, mais puisqu’on est bloqué, on ne pourra jamais l’exécuter dans le même thread. Comment faire ?
La solution est d’effectuer l’opération de lecture dans une autre unité d’exécution, soit un thread.
(thread
(loop [m "player 1 moves : "]
(println m)
(recur (<!! player1))))
La macro thread
permet de créer … un thread. Ce thread boucle indéfiniment. A chaque itération de boucle il affiche la valeur
contenue dans le channel player1
. Et dans un REPL ça donne :
user=> (def player1 (chan))
#'user/player1
user=> (thread (loop [m "player 1 moves : "] (println m) (recur (<!! player1))))
player 1 moves :
#
user=> (>!! player1 [5 8])
true
[5 8]
user=> (>!! player1 [1 2])
[1 2]
true
user=> (>!! player1 [1 3])
[1 3]
true
On a un thread qui produit les mouvements du joueur 1 sans savoir ce qui en sera fait. Et on a un autre thread qui reçoit les mouvements et les affiche. Au runtime nous avons nous même fait l’association producteur / consommateur. Cela correspond à la notion d’Invertion of Control, si ce n’est qu’on a codé notre propre framework et utilisant un channel pour faire la colle entre les composants.
La version du code utilisant core.async
va plus loin en rendant le code asynchrone. A chaque composant sa spécialisation, à chaque composant son unité d’exécution.
Modèle d’acteur
Wikipedia définit le modèle d’acteur ainsi :
Un acteur est une entité capable de calculer, qui, en réponse à un message reçu, peut parallèlement :
1) envoyer un nombre fini de messages à d’autres acteurs ;
2) créer un nombre fini de nouveaux acteurs ;
3) spécifier le comportement à avoir lors de la prochaine réception de messages.
Etudions ce petit programme :
(def player1 (chan))
(def player2 (chan))
(def result (chan))
(thread
(loop [scores {:player1 0 :player2 0 :draw 0}]
(println "Scores : " scores)
(recur
(update-in scores [(<!! result)] inc))))
(defn winner [correct-guess p1-guess p2-guess]
(cond
(= p1-guess p2-guess) :draw
(= correct-guess p1-guess) :player1
(= correct-guess p2-guess) :player2
:else :draw))
(thread
(loop []
(let [[p1-fingers p1-guess] (<!! player1)
[p2-fingers p2-guess] (<!! player2)
correct-guess (+ p1-fingers p2-fingers)]
(>!! result (winner correct-guess p1-guess p2-guess))
(recur))))
On a 2 unités d’exécution. La première est dédiée à l’affichage du score. A chaque fois qu’une valeur est présente dans result
la valeur de scores
est mise à jour.
Notons que la valeur actuelle du score est complètement encapsulée. Personne d’autre ne peut connaitre le score. La valeur du score courant est dépendante de la valeur précédente. Le 3ème point de la définition d’un acteur, spécifier le comportement à avoir lors de la prochaine réception de messages, est bien vérifié.
La seconde unité d’exécution lit tour à tour dans les channel player1
et player2
, détermine un gagnant et écrit le résultat dans result
qui sera relue par notre 1ère unité d’exécution. Le 1er point de la définition d’un acteur, envoyer un nombre fini de messages à d’autres acteurs est vrai.
Concernant le 2ème point de la définition, créer un nombre fini de nouveaux acteurs, je n’ai pas trouvé quelque chose de pertinant pour le démontrer. Cela est néanmoins possible si on remplace le premier acteur par ceci :
(defn print-stats [stats]
(let [total (apply + (vals stats))]
(println "wins : " (:wins stats) "/" total)
(println "draws : " (:draws stats) "/" total)))
(defn run-stats [channel]
(loop [stats {:wins 0, :draws 0}
total 0]
(print-stats stats)
(recur
(update-in stats [(<!! channel)] inc)
(inc total))))
(thread
(let [stats (chan)]
(thread (run-stats stats))
(loop [scores {:player1 0 :player2 0 :draw 0}]
(println "Scores : " scores)
(let [winner (<!! result)]
(>!! stats (if (= :draw winner) :draw :wins))
(recur (update-in scores [winner] inc))))))
La présence des deux threads imbriqués indique le lancement d’un acteur par un autre. Le 2ème point de la définition est vérifié.
Je pense qu’on peut appeler chaque thread un acteur, car on vient de voir que cela répond à la définition. Certes ce sont des acteurs très basiques (pas de pattern matching, pas de stratégie de mailbox, pas de priorité de messages, …). Au pire on peut dire que ce sont des objets, dans le sens où ils encapsulent un état et communiquent par message.
Programmation concurrente illimitée
Dans les précédents exemples nous avons instancié des threads pour consommer les valeurs contenues dans les channels. Chacun de ces threads consistait en une boucle infinie qui bloquait sont exécution jusqu’à ce qu’une valeur soit lisible. Ce modèle est limité par les quelques milliers de threads que l’on peut lancer sur la VM. Sans compter le temps perdu en context switch.
Imaginons maintenant que notre plateforme de jeu n’ait pas quelques milliers de threads mais plutôt un centaine de millier.
(dotimes [t 100000]
(let [c (chan)]
(thread
(loop []
(let [[_ v :as tv] (<!! c)]
(if (= 0 (mod v 100)) (println tv)))
(recur)))
(thread
(doseq [v (iterate inc 1)]
(>!! c [t v])))))
Ici on crée 2 threads 100000 fois. Le premier lit dans un channel et affiche le résultat si c’est un multiple de 100. Le second écrit les entiers de 1 à l’infini.
10000 lignes environ s’affichent puis plus rien. Le CPU monte à 100%, la JVM compte 32000 threads et boom, IOException.
Pour pallier ces limitations, core.async offre la possibilité de créer des processus ultra légers, capables de se partager un pool de threads et qui peuvent se mettre en pause sans bloquer le thread d’exécution.
(dotimes [t 100000]
(let [c (chan)]
(go
(loop []
(let [[t v :as tv] (<! c)]
(if (= 0 (mod v 100)) (println tv)))
(recur)))
(go
(doseq [v (iterate inc 1)]
(>! c [t v])))))
Dans cette version du programme, j’ai remplacé les threads créés par thread
par des processus ultra légers via go
et les appels bloquants >!!
et <!!
par leurs versions non-bloquantes >!
et <!
.
Le programme est stable et régulier : des salves d’informations s’affichent sur la console toutes les 40 secondes environ, les 8 coeurs de mon CPU sont à 40%, la consommation du heap oscille entre 1700 mégas et 160 après GC, et la JVM compte 70 threads.
Nous avons donc réussi à simuler 200000 threads ! J’ai oublié de préciser que ce code peut aussi tourner sur un navigateur … Oui on peut écrire du code asynchrone dans le navigateur sans callbacks (hell).
Functional Reactive Programming
Dans un article précédent, j’avais fait une micro présentation de FRP. Pour rappel, il s’agit d’un style de programmation basé sur la manipulation de flux de valeurs, à base de map
, filter
, zip
, concat
, … En bout de chaine de chaque flux, on peut “consommer” les valeurs du flux pour réaliser des effets de bord, un affichage ou une écriture en base par exemple.
Etudions ce bout de code :
(def player1 (chan))
(def player2 (chan))
(defn winner [correct-guess p1-guess p2-guess]
(cond
(= p1-guess p2-guess) :draw
(= correct-guess p1-guess) :player1
(= correct-guess p2-guess) :player2
:else :draw))
(defn process-round [[p1-fingers p1-guess] [p2-fingers p2-guess]]
(winner (+ p1-fingers p2-fingers) p1-guess p2-guess))
(def ch-results (async/map process-round [player1 player2]))
On commence par définir les deux channels qui contiennent les inputs des deux joueurs.
Ensuite, on a la fonction winner
que nous connaissons déjà. Puis vient la fonction process-round
qui prend les inputs des 2 joueurs et retourne le gagnant.
Enfin, nous créons un nouveau channel référencé par ch-results
. Cependant on le crée via la fonction map
. map
prend en paramètre un vecteur de channels et une fonction. La fonction a autant de paramètres que de channels dans le vecteur. map
crée un channel qui contient le résultat de f
appliquée aux premières valeurs des channels du vecteur.
Si on joue de la manière suivante :
(>!! player1 [1 2])
(>!! player2 [1 2])
(>!! player1 [1 2])
(>!! player1 [1 3])
Alors ch-results
contiendra :draw
, :player1
.
Poursuivons :
(defn async-reduce [f init ch-input]
(let [ch-output (chan)]
(go-loop [acc init]
(>! ch-output acc)
(recur (f acc (<! ch-input))))
ch-output))
(def ch-scores
(async-reduce #(update-in %1 [%2] inc) {:player1 0 :player2 0 :draw 0} ch-results))
(go (while true
(println "Scores : " (<! ch-scores))))
async-reduce
est une fonction utilitaire qui est similaire à un reduce
classique, sauf qu’au lieu de ne renvoyer qu’une seule valeur, elle renvoie un channel sur lequel elle publie tous les résultats intermédiaires.
Ensuite on crée un nouveau channel ch-scores
via async-reduce
qui contiendra tous les scores sucessifs, au fur et à mesure des parties.
Enfin un go
block consomme ch-scores
pour afficher les scores.
On a mergé, transformé des flux de données pour arriver au résultat attendu. Le side effect est à la fin du programme. Bref, du FRP.
Ce que j’aime avec ce style de programmation, c’est que c’est facilement extensible. Par exemple pour n’afficher que les résultats où un joueur a gagné, il suffit de modifier légèrement le flux de données avec un filtre :
(def ch-non-draw-results (async/filter< #(not= :draw %) ch-results ))
(def ch-scores
(async-reduce #(update-in %1 [%2] inc) {:player1 0 :player2 0} ch-non-draw-results))
Et une séquence de jeu ressemble à ça :
user=> (>!! player1 [1 2])
true
user=> (>!! player2 [1 2])
true
user=> (>!! player1 [1 2])
true
user=> (>!! player2 [1 3])
true
Scores : {:player2 0, :player1 1}
user=> (>!! player1 [1 2])
true
user=> (>!! player2 [1 3])
true
Scores : {:player2 0, :player1 2}
Les mots de la fin
On a passé en revue 4 bénéfices que l’on peut tirer de core.async
:
- Invertion Of Control
- Modèle d’acteur
- Programmation concurrente illimitée
- Functional Reactive Programming
On a réussi à implémenter tout ça avec une API super simple qui peut se résumer à :
- un tuyau
- des processus légers pour lire et écrire le tuyau
Comme nous l’avons vu, nous n’avons pas affaire à la lourdeur d’un framework qui impose une façon de faire, mais une librairie apportant les briques nécessaires pour faire du sur mesure.
Cette librairie existe aussi en ClojureScript. Cela vous permet d’écrire du code asynchrone sans callbacks dans le navigateur.
Bienvenue dans le futur.
Ressources
Rationale : explication des concepts fondamentaux et leur origine.
Walk throught Des exemples à taper dans la REPL.
Le jeu de Morra : le petit jeu implémenté dans cet article.