import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeoutException;
/**
* {@link <a href=
* "https://[Log in to view URL]"
* target= "_blank"></a>}
*
* @author itammb ( Italia Massimiliano Buscati )
* @version JDK 1.15
*
*/
class Main {
private final static List<String> queue_compute = Collections.synchronizedList(new ArrayList<String>());
public static class CustomRecursiveAction extends RecursiveAction {
private static final int THRESHOLD = 4;
int process;
private String workLoad;
public CustomRecursiveAction(int process, String workLoad) {
this.process = process;
this.workLoad = workLoad;
System.out.println(pad(process) + ">> " + workLoad);
}
/**
* @see RecursiveAction#invokeAll(java.util.Collection)
*/
@Override
protected void compute() {
if (workLoad.length() > THRESHOLD) // condizione di suddivisione
invokeAll(subActions()); // lista di sotto-processi da eseguire
else
queue_compute.add(workLoad.toUpperCase());
}
private List<CustomRecursiveAction> subActions() {
List<CustomRecursiveAction> buff = new ArrayList<>();
buff.add(new CustomRecursiveAction(process + 3, action(0, workLoad.length() / 2)));
buff.add(new CustomRecursiveAction(process + 3, action(workLoad.length() / 2, workLoad.length())));
return buff;
}
private String action(int start, int end) {
return workLoad.substring(start, end); // processo
}
private String pad(int n) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < n; i++)
sb.append(' ');
return sb.toString();
}
}
public static class Sum extends RecursiveTask<Double> {
private static final long serialVersionUID = -69L;
private static final int threshold = 100;
private double[] buf;
private double sum;
private int start, end; // parte di buffer processata
public Sum(double[] buf, int start, int end) {
this.buf = buf;
this.start = start;
this.end = end;
sum = 0;
}
@Override
protected Double compute() {
if ((end - start) < threshold)
task(); // processa il contenuto del buffer
else
subTask(); // divide il buffer in sotto task da processare
return sum;
}
private void task() {
for (int i = start; i < end; i++)
sum += buf[i];
}
/**
* @see RecursiveTask#fork()
* @see RecursiveTask#join()
*/
private void subTask() {
int middle = (start + end) / 2;
Sum subtaskA = new Sum(buf, start, middle);
Sum subtaskB = new Sum(buf, middle, end);
subtaskA.fork();
subtaskB.fork();
sum += subtaskA.join() + subtaskB.join();
}
}
private static class UniTest {
/**
* @see ForkJoinPool#invoke(java.util.concurrent.ForkJoinTask)
*/
public void simulateSubProcess(ForkJoinPool pool, int process) throws InterruptedException, ExecutionException {
CustomRecursiveAction action = new CustomRecursiveAction(process, "java_recursive_action");
pool.invoke(action);
shutdown(pool);
printQueueCompute();
}
public void simulateStepProcess(ForkJoinPool pool) throws InterruptedException, ExecutionException {
// popola buffer
double[] buf = new double[ 5000 ];
for (int i = 0; i < buf.length; i++)
buf[i] = (double) ( ( ( i % 2) == 0) ? i : -1 );
Sum task = new Sum( buf, 0, buf.length );
double summation = pool.invoke( task );
System.out.println( "sommatoria: " + summation);
pool.shutdown();
chk(buf);
}
private void chk(double[] buf) {
double summation = 0;
for (int i = 0; i < buf.length; i++)
summation += buf[ i ];
System.out.println( "chk: " + summation );
}
private void shutdown(ExecutorService pool) throws InterruptedException {
pool.shutdown();
while (!pool.isTerminated())
Thread.sleep(2);
System.out.println("SHUTDOWN - OK");
}
private void printQueueCompute() {
System.out.println(queue_compute.toString());
}
}
public static void main(String args[]) throws InterruptedException, ExecutionException, TimeoutException {
// Unit test - processo asincrono con suddivisione dell'elaborazione (ELABORAZIONE)
new UniTest().simulateSubProcess(new ForkJoinPool(), 0);
// Unit test - processo asincrono con attesa dell'elaborazione (RISULTATO ATTESO)
//new UniTest().simulateStepProcess(new ForkJoinPool());
}
}
To embed this project on your website, copy the following code and paste it into your website's HTML: