¡Hola de nuevo! Esto es un codekata, apunte o resumen en Java, sobre la sincronización entre procesos concurrentes que corren en paralelo en varios hilos de ejecución usando señales. Esta sincronización es un mecanismo habitual, para sincronizar procesos en tiempo real, optimizando el uso de recursos para que todo vaya lo más rápido posible.
El escenario de este post, es que tenemos 3 procesos concurrentes, que corren en paralelo. Tenemos un primer proceso que se va a encargar de lanzar otros dos, poniéndose a la espera sin consumir recursos en esta espera. El segundo proceso va a ser un productor de elementos cada cierto tiempo. Y el tercer proceso un consumidor de estos elementos.
Un poco de teoría, pero poco 😉
Cuando comenzamos a dividir tareas en varios procesos, puede que algunos procesos necesiten información de otros procesos, entonces podemos llegar a preparar memorias compartidas para compartir información entre estos procesos. Cuando un proceso necesita cierta información de otro, puede estar constantemente consultando a ver si la tiene disponible en la memoria compartida, consumiendo CPU constántemente cada vez que consulta. Pero para optimizar esto, los procesos que esperan pueden ponerse en espera de alguna señal, no consumiendo recursos.
Esto se hace con señales entre procesos, con el uso de mecanismos de espera wait() y despertar notifiyAll(). Estos mecanismos los provee Java obteniendo programas muy legible y mantenibles. Esta forma de programar optimiza el rendimiento porque al hacer wait(), un proceso duerme, dejando libre los recursos del sistema para que otros procesos puedan avanzar por mientras, mejorando el rendimiento total.
Aquí algo más de información sobre procesamientos en paralelo/concurrentes:
https://jnjsite.com/acelerando-paralelizando-concurrencia-memoria-compartida-entre-procesos-en-java/
https://jnjsite.com/controlando-el-acceso-exclusivo-a-memoria-compartida-en-java/
Vamos al grano con los códigos fuentes..
El proceso principal
Primero que todo tenemos un proceso principal que lo lanza todo:
class Main {
public static void main(String[] args) {
Buffer buffer = new Buffer();
Producer producer = new Producer(buffer);
Consumer consumer = new Consumer(buffer);
System.out.println(System.nanoTime() + " Main> Starting..");
try {
producer.start();
Thread.sleep(200);
consumer.start();
Thread.sleep(200);
System.out.println(System.nanoTime() + " Main> Stopping..");
System.exit(0);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
La idea es que al principio sólo tenemos al productor produciendo, acumulando items. Después llega el consumidor, que consumirá todo lo que se haya acumulado, lo más rápido posible. Y cuando el consumidor se alinea con el productor, se producen y consumen los items a la par, al mismo tiempo.
El proceso productor
Simplemente genera items en el buffer cada 20 milisegundos:
public class Producer extends Thread {
private Buffer buffer;
Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public synchronized void run() {
super.run();
Integer i = 1;
while (true) {
try {
Thread.sleep(20);
System.out.println(System.nanoTime() + " >> Producer> Adding item..");
this.buffer.produceItemInQueue("Item" + i);
i++;
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
}
El proceso consumidor
Simplemente consume sin parar todos los mensajes pendientes:
public class Consumer extends Thread {
private Buffer buffer;
Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public synchronized void run() {
super.run();
String newItem;
while (true) {
try {
System.out.println(System.nanoTime() +
" << Consumer> Looking for new items..");
newItem = this.buffer.consumeItemFromQueue();
System.out.println(System.nanoTime() + " << Consumer> Got item: " + newItem);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
}
¡Ojo! Aunque parezca que el proceso consumidor no para de consumir recursos por el while(true), realmente no lo hace. Porque dentro del while(true) llama a consumeItemFromQueue(), que lo pondrá a dormir si no hay elementos pendientes que consumir en la cola. Esto se ve claro en el siguiente apartado..
El buffer, el objeto que lo sincroniza todo, el monitor
Si revisamos el Main tenemos que se crea un objeto Buffer que se tiene accesible desde todos los 3 procesos. Este Buffer es un espacio de memoria compartida entre procesos, y va a realizar la labor de monitorizar, haciendo las paradas y notificaciones que correspondan. Vamos con el código:
import java.util.ArrayList;
public class Buffer {
private ArrayList<String> queue;
Buffer() {
queue = new ArrayList<String>();
}
public synchronized void produceItemInQueue(String item) {
this.queue.add(item);
System.out.println(System.nanoTime() + " >> Producing> Adding items: " + item +
", " + this.queue.size() + " items in the queue..");
notifyAll();
}
public synchronized String consumeItemFromQueue() throws InterruptedException {
while (this.queue.size() <= 0) {
System.out.println(System.nanoTime() + " << Consuming> Waiting for items" +
", " + this.queue.size() + " items in the queue now..");
wait();
}
System.out.println(System.nanoTime() + " << Consuming> Getting item" +
", " + this.queue.size() + " items in the queue now..");
String item = this.queue.get(0);
this.queue.remove(0);
return item;
}
}
Las dos funciones críticas son el wait() y el notifyAll().
Por un lado, tenemos que el proceso consumidor llamará al wait() en la función consumeItemFromQueue() cada vez que vaya a consumir y la cola esté vacía. De esta forma, simplemente se duerme, quedándose a la espera de que se le notifique que hay elementos para consumir. Si la cola no está vacía entonces consumirá item, porque no lanzará el wait(), sin ponerse a dormir..
Y por otro lado, tenemos que el proceso productor llamará al notifyAll() en la función produceItemInQueue() cada vez que inserte nuevos items en la cola, para despertar a cualquier proceso que se haya quedado dormido. En este caso se despertará a cualquier consumidor que se haya podido quedar dormido.
Terminando, cómo ampliar funcionalidades, y algunas explicaciones
Para terminar, sólo me queda invitarte a seguir tirando del hilo con los temas de notify, notifyAll, monitores.. También la palabra reservada synchronized tiene un papel importante, porque aplica acciones automáticamente gracias a Java, evitando que dos procesos lancen la misma función a la vez.
Otro tema a tener en cuenta es que estas paradas y notificaciones también se pueden implementar en los mismos Threads. Es decir, aquí tenemos un Buffer que implementa estas paradas y despertares, pero también se podrían implementar estos wait() y notify() dentro de cada proceso en concreto. Haciendo así haríamos algo diferente que usando notifyAll(). Es decir, de esta otra forma podríamos notificar directamente a un proceso en concreto, para que se levante y se ponga a trabajar, no a todos los procesos que estén parados como con el notifyAll(). Pero esto es material para otro post.. 😉
Si revisamos la salida por pantalla podemos ver las paradas, despertares y acciones de cada proceso:
28457176915569 Main> Starting..
28457198647731 >> Producer> Adding item..
28457198870489 >> Producing> Adding items: Item1, 1 items in the queue..
28457219020854 >> Producer> Adding item..
28457219144987 >> Producing> Adding items: Item2, 2 items in the queue..
28457239257661 >> Producer> Adding item..
28457239392054 >> Producing> Adding items: Item3, 3 items in the queue..
28457259628741 >> Producer> Adding item..
28457259748616 >> Producing> Adding items: Item4, 4 items in the queue..
28457279861550 >> Producer> Adding item..
28457280011421 >> Producing> Adding items: Item5, 5 items in the queue..
28457300141779 >> Producer> Adding item..
28457300277854 >> Producing> Adding items: Item6, 6 items in the queue..
28457320412099 >> Producer> Adding item..
28457320577659 >> Producing> Adding items: Item7, 7 items in the queue..
28457340706434 >> Producer> Adding item..
28457340837420 >> Producing> Adding items: Item8, 8 items in the queue..
28457360961095 >> Producer> Adding item..
28457361127477 >> Producing> Adding items: Item9, 9 items in the queue..
28457379095027 << Consumer> Looking for new items..
28457379284672 << Consuming> Getting item, 9 items in the queue now..
28457379364733 << Consumer> Got item: Item1
28457379411631 << Consumer> Looking for new items..
28457379461614 << Consuming> Getting item, 8 items in the queue now..
28457379517509 << Consumer> Got item: Item2
28457379559889 << Consumer> Looking for new items..
28457379622256 << Consuming> Getting item, 7 items in the queue now..
28457379680134 << Consumer> Got item: Item3
28457379724818 << Consumer> Looking for new items..
28457379766667 << Consuming> Getting item, 6 items in the queue now..
28457379819516 << Consumer> Got item: Item4
28457379865091 << Consumer> Looking for new items..
28457379917860 << Consuming> Getting item, 5 items in the queue now..
28457379970118 << Consumer> Got item: Item5
28457380011917 << Consumer> Looking for new items..
28457380056160 << Consuming> Getting item, 4 items in the queue now..
28457380108007 << Consumer> Got item: Item6
28457380152511 << Consumer> Looking for new items..
28457380191934 << Consuming> Getting item, 3 items in the queue now..
28457380245806 << Consumer> Got item: Item7
28457380286231 << Consumer> Looking for new items..
28457380339071 << Consuming> Getting item, 2 items in the queue now..
28457380391850 << Consumer> Got item: Item8
28457380440591 << Consumer> Looking for new items..
28457380481548 << Consuming> Getting item, 1 items in the queue now..
28457380530470 << Consumer> Got item: Item9
28457380570475 << Consumer> Looking for new items..
28457380608406 << Consuming> Waiting for items, 0 items in the queue now..
28457381256722 >> Producer> Adding item..
28457381400853 >> Producing> Adding items: Item10, 1 items in the queue..
28457381493607 << Consuming> Getting item, 1 items in the queue now..
28457381560502 << Consumer> Got item: Item10
28457381602441 << Consumer> Looking for new items..
28457381640863 << Consuming> Waiting for items, 0 items in the queue now..
28457401553051 >> Producer> Adding item..
28457401720515 >> Producing> Adding items: Item11, 1 items in the queue..
28457401784225 << Consuming> Getting item, 1 items in the queue now..
28457401897938 << Consumer> Got item: Item11
28457401969612 << Consumer> Looking for new items..
28457402028282 << Consuming> Waiting for items, 0 items in the queue now..
28457421836937 >> Producer> Adding item..
28457421958495 >> Producing> Adding items: Item12, 1 items in the queue..
28457422016864 << Consuming> Getting item, 1 items in the queue now..
28457422088960 << Consumer> Got item: Item12
28457422164852 << Consumer> Looking for new items..
28457422202843 << Consuming> Waiting for items, 0 items in the queue now..
28457442074154 >> Producer> Adding item..
28457442197736 >> Producing> Adding items: Item13, 1 items in the queue..
28457442248231 << Consuming> Getting item, 1 items in the queue now..
28457442310879 << Consumer> Got item: Item13
28457442346305 << Consumer> Looking for new items..
28457442381701 << Consuming> Waiting for items, 0 items in the queue now..
28457462334135 >> Producer> Adding item..
28457462521597 >> Producing> Adding items: Item14, 1 items in the queue..
28457462610403 << Consuming> Getting item, 1 items in the queue now..
28457462688209 << Consumer> Got item: Item14
28457462737973 << Consumer> Looking for new items..
28457462785031 << Consuming> Waiting for items, 0 items in the queue now..
28457482668536 >> Producer> Adding item..
28457482814459 >> Producing> Adding items: Item15, 1 items in the queue..
28457482881415 << Consuming> Getting item, 1 items in the queue now..
28457482967677 << Consumer> Got item: Item15
28457483001300 << Consumer> Looking for new items..
28457483071632 << Consuming> Waiting for items, 0 items in the queue now..
28457502946861 >> Producer> Adding item..
28457503107813 >> Producing> Adding items: Item16, 1 items in the queue..
28457503167695 << Consuming> Getting item, 1 items in the queue now..
28457503253967 << Consumer> Got item: Item16
28457503290986 << Consumer> Looking for new items..
28457503334187 << Consuming> Waiting for items, 0 items in the queue now..
28457523238771 >> Producer> Adding item..
28457523454686 >> Producing> Adding items: Item17, 1 items in the queue..
28457523545657 << Consuming> Getting item, 1 items in the queue now..
28457523621579 << Consumer> Got item: Item17
28457523652898 << Consumer> Looking for new items..
28457523680770 << Consuming> Waiting for items, 0 items in the queue now..
28457543590223 >> Producer> Adding item..
28457543705580 >> Producing> Adding items: Item18, 1 items in the queue..
28457543754171 << Consuming> Getting item, 1 items in the queue now..
28457543821217 << Consumer> Got item: Item18
28457543854139 << Consumer> Looking for new items..
28457543888383 << Consuming> Waiting for items, 0 items in the queue now..
28457563820108 >> Producer> Adding item..
28457563956974 >> Producing> Adding items: Item19, 1 items in the queue..
28457564017849 << Consuming> Getting item, 1 items in the queue now..
28457564064957 << Consumer> Got item: Item19
28457564089393 << Consumer> Looking for new items..
28457564133085 << Consuming> Waiting for items, 0 items in the queue now..
28457578717136 Main> Stopping..