Intégration du multithread dans le prototype

Pour commencer cette lecture, voici un petit rappel de ce qu’il se passe lorsqu’on dispatch une action dans l’application.

On dispatch une action, qui appel une méthode, cet appel envoie un event dans l’application, la méthode appelée par le dispatch peut elle même dispatcher une autre action, qui enverra elle aussi un event, les méthode dispatchée peuvent commit des données dans le state, ce qui appellera d’autre event, etc., etc. Un bon petit schéma fait pas de mal :

Une chaine de méthodes dispatchées, de commit et d’event, ça peut vite devenir gourmand en ressources ou simplement prendre du temps, ou encore on aura peut-être besoin d’appeler certaine méthode en boucle sans avoir à bloquer tout le reste de l’application.

Du coup, la meilleure des solutions, c’est le multithreading. Le principe est très simple, nos processeurs modernes sont tous équipés de plusieurs cœurs, qui eux même peuvent être divisés en 2 thread.
Perso j’ai un 6 cœurs qui ont chacun 2 thread, ça me fait 12 thread à disposition pour exécuter 12 opération en parallèle.

Pour continuer sur l’exemple d’un marchés qu’on veut suivre de près, l’algorithme serait un peu comme ceci :

  1. Fait une requête de chargement des dernières données de prix du marché.
  2. On met à jours les données de ce marché dans le state.
  3. On recalcul les indicateurs avec les dernières données du marché.
  4. On met à jour les données des indicateurs de ce marché dans le state.
  5. On met éventuellement à jour la vue graphique si on est sur desktop.

Toutes ces actions, commit et event, peuvent prendre du temps d’exécution, le calcul des indicateur par exemple.
Imaginons maintenant qu’on veut suivre non pas juste un seul marché, mais 10, si on faisait ça de manière synchrone, on risquerait de rater des fenêtres d’occasions, peut être que certaines données viendraient à manquer, etc.

Par contre, si on utilise 10 thread pour faire ces actions, cela prendrait autant de temps que de faire un seul appel de marché.

Chaque thread exécute une chaine dispatchée (je ne les ai pas mi dedans mais cela inclus évidemment les commits et les events)

Bien entendu, sur le papier, c’est joli. Mais dans la pratique cela pose beaucoup de questions et pour beaucoup je n’ai pas encore les réponses.

Mon intégration actuelle du multithreading ne sera uniquement fait que pour des actions qu’on n’ont pas d’implication sur d’autres actions.

Mettre à jour les données d’un marché, cela ne va toucher que ce marché, rien d’autre. Il n’y aura pas d’interaction avec d’autre processus d’exécution.
Le multithread est un sujet très très complexe que je ne maîtrise pas vraiment, je vais donc essayer de garder les choses le plus simple possible.

Codons un peu tout ça !

On va d’abord commencer, par le RootApplication et lui ajouter les outil nécessaire :

Application.py
Attention je n’ai copié collé que le code que je rajoute, je mettrai le code complet en fin d’article.

from threading import Thread, Lock
from queue import Queue

class RootApplication:
    def __init__(self):
        # [...]

        # Multi Threading Attributes
        self.threads = []
        self.queue = Queue(10)
        self.lock = Lock()

        for i in range(os.cpu_count()-2):
            self.create_worker(f'Thread_{i}')

        for thread in self.threads:
            thread.start()

    def create_worker(self, name):
        new_worker = Worker(self, name)
        self.threads.append(new_worker)

    def add_worker_task(self, namespace, payload={}):
        self.lock.acquire()
        self.queue.put((namespace, payload))
        self.lock.release()

    def dispatch_threaded(self, action_name, payload={}):
        self.add_worker_task(action_name, payload)

    # [...]
  1. On rajoute des attributs permettant de stocker les Threads, un objet Queue qui permet de stack l’ensemble des tâches à faire. Un objet Lock qui permet de bloquer les thread pour synchroniser la récupération d’une tâche dans la Queue.
  2. On rajoute des boucle, une pour la création des Thread et une autre pour démarrer les thread.
  3. On créé une méthode qui permet de créer un Worker (extension de Thread).
  4. On créé une méthode qui permet d’ajouter une tâche dans la Queue des tâches.
  5. J’ai également créer une méthode alias de add_worker_task, que j’ai nommé dispatch_threaded afin d’avoir un nom intuitif, dont on sait qu’il va lancer une action dispatchée chez un Worker.

On va maintenant créer notre class Worker qui héritera de la classe Thread qu’on importe en début de page :

class Worker(Thread):
    def __init__(self, root, name):
        super().__init__()

        self.name = name
        self.root = root

    def run(self):
        while True:
            self.root.lock.acquire()
            if not self.root.queue.empty():
                namespace, payload = self.root.queue.get()
                self.root.lock.release()
                self.root.dispatch_sync(namespace, payload)
            else:
                self.root.lock.release()

Un thread reçoit un nom et reçoit l’instance du RootApplication. Pour faire simple, tous les thread partagerons le même espace en mémoire vive, c’est à dire que si un commit modifie le state, il sera changé dans tous les thread.

La méthode run() du Thread quand a elle est appelée lorsque que le thread est lancé.
Notre méthode run est très simple, elle vérifie constamment si il y a des tâches dans la Queue. Si c’est le cas, le thread la récupère et exécute un dispatch_sync() pour lancer l’action demandée.

Et en fait, ben c’est presque tout, maintenant il ne nous reste plus qu’à modifier le fichier main pour pouvoir tester ça. Il n’y aura qu’à modifier le moment ou on fait appel au dispatch, on change avec le dispatch_threaded() et le nombre de boucle à faire, moi j’ai mi 20.

main.py

if __name__ == '__main__':
    app = RootApplication()
    app.use(TestComponent)

    i = 0

    # On exécute l'action plusieurs fois pour clairement voir les effets
    while i < 20:
        print(f'LOOP : {i}')
        app.dispatch_threaded('Test.chooseNameAction', {'names': ['marc', 'morgan', 'elie', 'mia']})
        i += 1

Si vous testez ça, vous verrez que l’ensemble des print() effectué sont complètement mélangé et c’est tout à fait normal, il n’y a aucune priorisation des actions a mené, tout est totalement parallèle, certains iront plus vite que d’autre.

Voilà, l’application intègre maintenant une gestion du multithread assez simple. Il est certains que je pourrai aller beaucoup plus loin et plus en détail pour gérer les tâches bloquante par exemple. Mais n’allons pas trop loin trop vite. La gestion des tâches parallèles non bloquante est déjà pas mal.

Voilà, j’espère que cela vous aura plu ! N’hésitez pas à vous inscrire et taper des commentaires si vous en avez envie ou besoin 🙂

Le Code complet :

Application.py :

from threading import Thread, Lock
from queue import Queue
import os
import asyncio


class Context:
    def __init__(self):
        pass


class Payload:
    def __init__(self):
        pass


class AbstractComponent:
    def __init__(self, name, root):
        self.name = name
        self.root = root

        self.state = {}
        self.actions = {}
        self.mutations = {}

    async def dispatch(self, namespace, payload=Payload()):
        return await self.root.dispatch(namespace, payload)

    def dispatch_sync(self, namespace, payload=Payload()):
        return self.root.dispatch_sync(namespace, payload)

    def commit(self, namespace, payload=Payload()):
        self.root.commit(namespace, payload)

    def on(self, event_name, callback):
        self.root.on(event_name, callback)

    def emit(self, event_name, payload=Payload()):
        self.root.emit(event_name, payload)


class Worker(Thread):
    def __init__(self, root, name):
        super().__init__()

        self.name = name
        self.root = root

    def run(self):
        while True:
            self.root.lock.acquire()
            if not self.root.queue.empty():
                namespace, payload = self.root.queue.get()
                self.root.lock.release()
                self.root.dispatch_sync(namespace, payload)
            else:
                self.root.lock.release()


class RootApplication:
    def __init__(self):
        self.states = {}
        self.actions = {}
        self.mutations = {}
        self.events = {}
        self.components = {}

        # Multi Threading Attributes
        self.threads = []
        self.queue = Queue(10)
        self.lock = Lock()

        for i in range(os.cpu_count()-2):
            self.create_worker(f'Thread_{i}')

        for thread in self.threads:
            thread.start()

    def create_worker(self, name):
        new_worker = Worker(self, name)
        self.threads.append(new_worker)

    def add_worker_task(self, namespace, payload={}):
        self.lock.acquire()
        self.queue.put((namespace, payload))
        self.lock.release()

    def get_namespace(self, namespace, _space="actions"):
        parts = namespace.split('.')
        root = getattr(self, _space)

        for i in parts:
            if root[i]:
                root = root[i]

        return root

    @staticmethod
    def build_payload_object(data):
        payload = Payload()
        for i in data:
            payload.__setattr__(i, data[i])
        return payload

    async def dispatch(self, action_name, payload=Payload()):
        action = self.get_namespace(action_name)

        context = Context()
        context.__setattr__("commit", self.commit)
        context.__setattr__("dispatch", self.dispatch)

        self.emit(action_name, payload)

        return await action(context, self.build_payload_object(payload))

    def dispatch_sync(self, action_name, payload=Payload()):
        return asyncio.run(self.dispatch(action_name, payload))

    def dispatch_threaded(self, action_name, payload={}):
        self.add_worker_task(action_name, payload)

    def commit(self, mutation_name, payload=Payload()):
        mutation = self.get_namespace(mutation_name, 'mutations')
        mutation(self.states, self.build_payload_object(payload))
        self.emit(mutation_name, payload)

    def on(self, event_name, callback):
        self.events[event_name] = callback

    def emit(self, event_name, payload=Payload()):
        if event_name in self.events:
            context = Context()
            context.__setattr__("commit", self.commit)
            context.__setattr__("dispatch", self.dispatch)
            context.__setattr__("states", self.states)

            self.events[event_name](context, payload)

    def use(self, component_class):
        if issubclass(component_class, AbstractComponent):
            new_component = component_class(self)
            self.states[new_component.name] = new_component.state
            self.actions[new_component.name] = new_component.actions
            self.mutations[new_component.name] = new_component.mutations
            self.components[new_component.name] = new_component

main.py

from Application import AbstractComponent, RootApplication
import random


class TestComponent(AbstractComponent):
    def __init__(self, root):
        super().__init__("Test", root)

        self.state['name'] = "NoName"
        self.actions['chooseNameAction'] = self.chooseNameAction
        self.mutations['updateNameMutation'] = self.updateNameMutation
        self.on('Test.chooseNameAction', self.onChooseNameAction)
        self.on('Test.updateNameMutation', self.onUpdateNameMutation)

    """
    On défini une méthode de choix d'un nom parmis un tableau de nom.
    """
    async def chooseNameAction(self, context, payload):
        choice = random.choice(payload.names)
        context.commit('Test.updateNameMutation', {'name': choice})

    """
    On défini une méthode qui permettra d'afficher le payload envoyé à
    la méthode de choix de nom.
    """
    def onChooseNameAction(self, context, payload):
        print('Action Choose Name Executed')
        print(f'Payload : {payload}')
        print('-----------------------------')

    """
    On défini la méthode de mutation qui permettra de changer le state
    """
    def updateNameMutation(self, states, payload):
        self.state['name'] = payload.name

    """
    On défini une méthode permettant de vérifier le payload de la mutation
    et de vérifier que le state à bien été muté.
    """
    def onUpdateNameMutation(self, context, payload):
        print('Mutation updateNameMutation executed')
        print(f'Payload : {payload}')
        print('-----------------------------')
        print('Actual State')
        print(self.state['name'])
        print('-----------------------------')


if __name__ == '__main__':
    app = RootApplication()
    app.use(TestComponent)

    i = 0

    # On exécute l'action plusieurs fois pour clairement voir les effets
    while i < 20:
        print(f'LOOP : {i}')
        app.dispatch_threaded('Test.chooseNameAction', {'names': ['marc', 'morgan', 'elie', 'mia']})
        i += 1

Ecrire un commentaire