Este post es continuación de este otro sobre paralelizar procesos para acelerarlos usando memoria compartida. Quedó en el aire la sincronización de los procesos. Esto se puede hacer de varias formas, pero si simplemente queremos evitar que dos procesos escriban a la vez en la misma zona de memoria compartida, lo podemos implementar con la palabra reservada synchronized.
Si dos procesos no deben escribir a la vez en la memoria compartida y lo hacen, se perderá información, habrá malfuncionamiento del sistema completo.
Vamos al grano que es más fácil leer el código que explicarlo..
Objeto Java que representa la memoria compartida
SImplemente almacena el contador de movimientos, para luego probar más abajo la sincronización de su actualización.
public class SharedMemory {
private Integer movements;
public SharedMemory() {
this.movements = 0;
}
public void doMovement() {
this.movements++;
}
public Integer getMovements() {
return this.movements;
}
}
Tareas en hilos de ejecución concurrentes que usan la memoria compartida
Hilo de tipo Thread que recibe el objeto de memoria compartida para hacer movimientos. Cada hilo simplemente hará 100 movimientos.
public class TaskInAThread extends Thread {
private SharedMemory sharedMemory;
private Integer key;
public TaskInAThread(SharedMemory sharedMemory) {
this.sharedMemory = sharedMemory;
}
@Override
public void run() {
super.run();
try {
for (int i = 1; i <= 100; i++) {
this.sharedMemory.doMovement();
}
} catch (Exception e) {
System.out.println("ERROR: " + e.getMessage());
}
}
}
Proceso principal que lo lanza todo y muestra resultados
El proceso principal que lanza 30 hilos de ejecución.
import java.time.Clock;
public class Main {
private static final int KTHREADS = 30;
public static void main(String[] args) {
Clock clock = Clock.systemUTC();
Long startMillis = clock.millis();
System.out.println("Starting..");
// Create the shared memory object to store the status of all threads..
SharedMemory sharedMemory = new SharedMemory();
// Create the threads and launch them..
TaskInAThread[] threads = new TaskInAThread[KTHREADS];
for (int i = 0; i < KTHREADS; i++) {
threads[i] = new TaskInAThread(sharedMemory);
threads[i].start();
}
// While not all threads are finished..
String aux;
boolean stop = false;
while (!stop) {
try {
Thread.sleep(1000);
stop = true;
for (int i = 0; i < KTHREADS; i++) {
if (threads[i].isAlive()) {
stop = false;
}
}
} catch (Exception e) {
System.out.println("ERROR: " + e.getMessage());
}
}
System.out.println("All done in " + (clock.millis() - startMillis) + " millis and "
+ sharedMemory.getMovements() + " movements registered!");
}
}
Ejecutando sin exclusión mútua
Si ejecutamos todo tal cual veremos algo como lo siguiente:
Tenemos 30 procesos, ejecutando cada uno 100 movimientos. Pero no han sumado 3000 movimientos, si no 2568, esto se debe a que se han solapado procesos a la hora de acumular los movimientos, perdiéndose información.
Ejecutando con exclusión mútua
Esto se consigue usando la palabra reservada synchronized. Declarando las funciones de la memoria compartida de tipo synchronized, para asegurarnos de que sólo un proceso ejecuta dichas funciones.
Es decir, sin recitar la teoría sobre exclusión mútua, semáforos, concurrencia, etc.. tenemos que Java se encargará de asegurar que los procesos se esperen unos a otros, para asegurar que dichas funciones no se solapen en el tiempo nunca por dos procesos distintos.
public synchronized void doMovement() {
this.movements++;
}
El resultado es que, el acumulador de movimientos, suma ahora correcto, no se ha perdido ninguno de los movimientos hechos por los treinta procesos, todos han sumado: