Acelerando, paralelizando, concurrencia, memoria compartida entre procesos en Java

2022-06-19 - Categorías: General / Java

Hoy traigo otro codekata, un howto o resumen sobre Java, y el manejo de procesos. Se trata de un tema recurrente, para acelerar procesos paralelizándolos y coordinándolos entre sí. Este post se centra en cómo usar una memoria compartida, entre 30 procesos en paralelo, que van escribiendo información en dicha memoria compartida.

En Java se facilita mucho todo esto, tiene funcionalidades que ayudan como por ejemplo: la clase Thread, los parámetros por referencia, la palabra reservada synchronized, o el manejo de hilos en núcleos diferentes del procesador de forma transparente para el programador gracias al JRE.

Resumiendo, esto es un esqueleto para tratar de multiplicar x30 la velocidad de un programa, usando procesos concurrentes, y compartiendo los resultados en memoria.

A grano..

Preparando una memoria compartida entre procesos

Podemos declarar un objeto que va a representar la memoria compartida. Este objeto podemos entonces pasarlo por referencia a todos los hilos concurrentes que van a ejecutar los procesos. Así cada hilo de ejecución podrá ir escribiendo sus resultados, y esta memoria compartida será accesible por todos los procesos del programa.

Un posible objeto de memoria compartida podría ser el siguiente:

import java.util.HashMap;

public class SharedMemory {
    private HashMap<Integer, Integer> status;

    public SharedMemory(int kthreads) {
        this.status = new HashMap<Integer, Integer>();
        for (int i = 0; i < kthreads; i++) {
            this.status.put(i, 0);
        }
    }

    // public synchronized void setStatusKey(Integer key, Integer value) {
    public void setStatusKey(Integer key, Integer value) {
        this.status.put(key, value);
    }

    public HashMap<Integer, Integer> getStatus() {
        return this.status;
    }

    public Integer getStatusKey(Integer key) {
        return this.status.get(key);
    }
}

Resumiendo, tenemos una variable status que es un array de estados, va a representar un porcentaje del proceso realizado por cada hilo de ejecución. En la inicialización de este objeto se le pasará el número de hilos de ejecución, para que inicialice a 0 el porcentaje hecho de cada proceso.

Como curiosidad dejo el uso de synchronized para realizar un acceso excluyente a memoria compartida, pero esto es tema para otro post.. 😉 aquí el otro post sobre acceso exclusivo a memoria compartida.

Los procesos de trabajo, los hilos de ejecución concurrentes, paralelos

Lo siguiente es preparar los procesos que queremos paralelizar para acelerar todo. En este caso, todos los procesos van a ser iguales por simplificar, pero podría ser cada proceso diferente:

public class TaskInAThread extends Thread {
    private SharedMemory sharedMemory;
    private Integer key;

    public TaskInAThread(SharedMemory sharedMemory, Integer key) {
        this.sharedMemory = sharedMemory;
        this.key = key;
    }

    @Override
    public void run() {
        super.run();

        try {
            for (int i = 1; i <= 100; i++) {
                Thread.sleep((long) ((Math.random() * 10 + 1) * 100));
                this.sharedMemory.setStatusKey(this.key, i);
            }
        } catch (Exception e) {
            System.out.println("ERRO: " + e.getMessage());
        }
    }
}

Cada proceso recibe entonces una referencia al objeto compartido llamado sharedMemory. Esto es así porque Java cuando recibe un objeto complejo, lo recibe por referencia. Esto quiere decir que todos los procesos tienen en la sharedMemory una referencia que apunta al mismo objeto, por esto es compartido.

Con las variables simples no ocurre así, por ejemplo con un Integer. Con los objetos simples, cuando se pasan a funciones u otros objetos, se hace una copia totalmente independiente. Entonces en estos casos el objeto simple no es compartido.

Cada proceso entonces, sólo va a ir escribiendo en memoria compartida su porcentaje de estado con la línea:

                this.sharedMemory.setStatusKey(this.key, i);

..y desde cualquier otro proceso se podrá consultar el estado de todos los procesos.

El proceso principal que lo lanza todo

Lo siguiente entonces es enlazarlo todo. Necesitaremos por lo menos un proceso principal que cree el objeto de memoria compartida, y que cree los 30 objetos de procesamiento lanzándolos. Esto se podría hacer tal que así:

import java.time.Clock;

public class Main {
    private static final int KTHREADS = 30;

    public static void main(String[] args) {
        System.out.println("Starting..");

        // Create the shared memory..
        SharedMemory sharedMemory = new SharedMemory(KTHREADS);

        // Create the threads and launch them..
        TaskInAThread[] threads = new TaskInAThread[KTHREADS];
        for (int i = 0; i < KTHREADS; i++) {
            threads[i] = new TaskInAThread(sharedMemory, i);
            threads[i].start();
        }

        // Finish..
        System.out.println("All done!");
    }
}

Haciendo esto tenemos que ver por pantalla esto:

Por defecto Java va a esperar, en este proceso principal, a que todos los subprocesos terminen. Mostrará por pantalla la cadena «All done!», pero no terminará por completo hasta que todos los objetos TaskInAThread terminen también. Estos procesos irán escribiendo su porcentaje de trabajo hecho en sharedMemory. Si queremos visualizar este estado podemos entonces añadir algo tal que así:

        // While not all threads achieve status 100%..
        String aux;
        boolean stop = false;
        while (!stop) {
            try {
                Thread.sleep(1000);

                aux = "";
                for (int i = 0; i < KTHREADS; i++) {
                    aux += sharedMemory.getStatusKey(i) + "%,";
                }
                System.out.println(aux + (clock.millis() / 1000 - startSecs) + " secs..");

                // Check if all finished..
                stop = true;
                for (int i = 0; i < KTHREADS; i++) {
                    if (threads[i].isAlive()) {
                        stop = false;
                    }
                }
            } catch (Exception e) {
                System.out.println("ERROR: " + e.getMessage());
            }
        }
        System.out.println("All done!");

Ahora si lo volvemos a lanzar tenemos que ver por pantalla algo parecido a lo siguiente:

Faltan unas variables al principio en el Main.java. Este Main al completo podría quedar tal que así:

import java.time.Clock;

public class Main {
    private static final int KTHREADS = 30;

    public static void main(String[] args) {
        Clock clock = Clock.systemUTC();
        Long startSecs = clock.millis() / 1000;
        System.out.println("Starting..");

        // Create the shared memory object to store the status of all threads..
        SharedMemory sharedMemory = new SharedMemory(KTHREADS);

        // Create the threads and launch them..
        TaskInAThread[] threads = new TaskInAThread[KTHREADS];
        for (int i = 0; i < KTHREADS; i++) {
            threads[i] = new TaskInAThread(sharedMemory, i);
            threads[i].start();
        }

        // While not all threads achieve status 100%..
        String aux;
        boolean stop = false;
        while (!stop) {
            try {
                Thread.sleep(1000);

                aux = "";
                for (int i = 0; i < KTHREADS; i++) {
                    aux += sharedMemory.getStatusKey(i) + "%,";
                }
                System.out.println(aux + (clock.millis() / 1000 - startSecs) + " secs..");

                // Check if all finished..
                stop = true;
                for (int i = 0; i < KTHREADS; i++) {
                    if (threads[i].isAlive()) {
                        stop = false;
                    }
                }
            } catch (Exception e) {
                System.out.println("ERROR: " + e.getMessage());
            }
        }
        System.out.println("All done!");
    }
}

Terminando, recapitulando

Para terminar con el post, recapitulando con todo el codekata, tenemos 30 procesos que se ejecutan en paralelo, guardando en un objeto de memoria compartida su porcentaje de ejecución. Y por otro lado desde el programa principal, mostramos por pantalla el porcentaje hecho de los 30 procesos, a partir de la información que van almacenando en memoria compartida, hasta que hayan terminado.

Esta forma de programar, en este caso, podría llegar a multiplicar x30 la velocidad de todo el proceso, si los recursos lo permiten..

Deja una respuesta

Tu dirección de correo electrónico no será publicada.

 

© 2022 JnjSite.com - MIT license

Sitio hecho con WordPress, diseño y programación del tema por Jnj.