import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeoutException;

/**
 * {@link <a href=
 * "https://[Log in to view URL]"
 * target= "_blank"></a>}
 * 
 * @author itammb ( Italia Massimiliano Buscati )
 * @version JDK 1.15
 *
 */
class Main {
	private final static List<String> queue_compute = Collections.synchronizedList(new ArrayList<String>());
	
	public static class CustomRecursiveAction extends RecursiveAction {
		private static final int THRESHOLD = 4;
      
		int process;
		private String workLoad;
				
		public CustomRecursiveAction(int process, String workLoad) {
			this.process = process;
			this.workLoad = workLoad;
			
			System.out.println(pad(process) + ">> " + workLoad);
		}

		/**
		 * @see RecursiveAction#invokeAll(java.util.Collection)
		 */
		@Override
		protected void compute() {
			if (workLoad.length() > THRESHOLD) // condizione di suddivisione
				invokeAll(subActions());       // lista di sotto-processi da eseguire
			else
				queue_compute.add(workLoad.toUpperCase());
		}

		private List<CustomRecursiveAction> subActions() {
			List<CustomRecursiveAction> buff = new ArrayList<>();

			buff.add(new CustomRecursiveAction(process + 3, action(0, workLoad.length() / 2)));
			buff.add(new CustomRecursiveAction(process + 3, action(workLoad.length() / 2, workLoad.length())));

			return buff;
		}
		
		private String action(int start, int end) {		
			return workLoad.substring(start, end); // processo
		}
		
		private String pad(int n) {
			StringBuilder sb = new StringBuilder();
			for (int i = 0; i < n; i++) 
				sb.append(' ');
				
			return sb.toString();
		}
	}

	public static class Sum extends RecursiveTask<Double> {
		private static final long serialVersionUID = -69L;
		private static final int threshold = 100;

		private double[] buf;
		private double sum;
		private int start, end; // parte di buffer processata
		
		public Sum(double[] buf, int start, int end) {
			this.buf = buf;
			this.start = start;
			this.end = end;
			
			sum = 0;
		}

		@Override
		protected Double compute() {
			if ((end - start) < threshold) 
				task(); // processa il contenuto del buffer
			else 
				 subTask(); // divide il buffer in sotto task da processare

			return sum;
		}
		
		private void task() {
			for (int i = start; i < end; i++)
				sum += buf[i];
		}
		
		/**
		 * @see RecursiveTask#fork()
		 * @see RecursiveTask#join()
		 */
		private void subTask() {
			int middle = (start + end) / 2;

			Sum subtaskA = new Sum(buf, start, middle);
			Sum subtaskB = new Sum(buf, middle, end);

			subtaskA.fork();
			subtaskB.fork();

			sum += subtaskA.join() + subtaskB.join();
		}
	}

	private static class UniTest {
		/**
		 * @see ForkJoinPool#invoke(java.util.concurrent.ForkJoinTask)
		 */
		public void simulateSubProcess(ForkJoinPool pool, int process) throws InterruptedException, ExecutionException {
			CustomRecursiveAction action = new CustomRecursiveAction(process, "java_recursive_action");
			
			pool.invoke(action);
	
			shutdown(pool);

			printQueueCompute();
		}
		
		public void simulateStepProcess(ForkJoinPool pool) throws InterruptedException, ExecutionException {
			// popola buffer
			double[] buf = new double[ 5000 ];
			for (int i = 0; i < buf.length; i++) 
				 buf[i] = (double) ( ( ( i % 2) == 0) ? i : -1 );

			Sum task = new Sum( buf, 0, buf.length );

			double summation = pool.invoke( task );

			System.out.println( "sommatoria: " + summation); 

			pool.shutdown();
			
			chk(buf);
		}
		
		private void chk(double[] buf)  {
			double summation = 0;
			for (int i = 0; i < buf.length; i++) 
				 summation += buf[ i ];
				 
			System.out.println( "chk: " + summation );
		}
		
		private void shutdown(ExecutorService pool) throws InterruptedException {
			pool.shutdown();

			while (!pool.isTerminated())
				Thread.sleep(2);

			System.out.println("SHUTDOWN - OK");
		}
		
		private void printQueueCompute() {
			System.out.println(queue_compute.toString());
		}
	}

	public static void main(String args[]) throws InterruptedException, ExecutionException, TimeoutException {
		// Unit test - processo asincrono con suddivisione dell'elaborazione (ELABORAZIONE)
		new UniTest().simulateSubProcess(new ForkJoinPool(), 0);
		
		// Unit test - processo asincrono con attesa dell'elaborazione (RISULTATO ATTESO)
		//new UniTest().simulateStepProcess(new ForkJoinPool());
	}
}

Embed on website

To embed this project on your website, copy the following code and paste it into your website's HTML: