Block Image

In scenari batch ad alto volume, capita spesso di dover elaborare record che possono essere raggruppati secondo un criterio logico di partizione (ad esempio per utente, area geografica, categoria, ecc.). Organizzare l’elaborazione per partizioni può portare diversi vantaggi, soprattutto in termini di performance e gestione della concorrenza.

Con un approccio batch partizionato, è possibile:

  1. Raggruppare i record appartenenti alla stessa partizione per eseguire una sola volta operazioni comuni (come update su tabelle aggregate o chiamate esterne).
  2. Parallelizzare l’elaborazione tra partizioni indipendenti, migliorando il throughput complessivo del job.

In questo articolo ti mostro come utilizzare Spring Batch per costruire un job partizionato in grado di elaborare ogni partizione in parallelo, mantenendo al tempo stesso l’elaborazione sequenziale dei record all’interno di ciascuna partizione.

Il Problema

Durante una recente attività lavorativa, mi è capitato di dover implementare un job che leggesse dei record da una tabella DynamoDB contenente informazioni sulle spedizioni. Le spedizioni sono gestite da recapitisti distribuiti su diverse province: uno stesso recapitista può operare in più province. Per questo motivo, ogni spedizione è identificata da una coppia logica recapitista-provincia, come mostrato di seguito:

paper_delivery_idcreated_atdelivery_driver_idprovince
delivery-12025-04-14T00:00:00Z1NA
delivery-22025-04-14T01:00:00Z1NA
delivery-32025-04-15T02:00:00Z1RM
delivery-42025-04-15T00:00:00Z2MI

Parallelamente, su un’altra base dati, è presente una tabella che tiene traccia della capacità settimanale disponibile per ogni recapitista-provincia, insieme alla capacità già utilizzata:

delivery_driver_idprovincecapacityused_capacity
1NA700
1RM600
2MI800

Il funzionamento del batch è il seguente: quando viene letto un record di spedizione, il job verifica se la coppia recapitista-provincia ha ancora capacità disponibile. Se sì:

  1. La spedizione viene processata (nel nostro esempio, semplicemente loggata) ed eliminata dalla tabella.
  2. La capacità utilizzata per quella partizione viene incrementata.

Se invece la capacità è terminata, la spedizione non viene processata e rimane nella tabella per essere valutata in un’esecuzione successiva (ad esempio, la settimana dopo).
Il seguente diagramma mostra la logica di elaborazione di una spedizione:

Block Image

Analizzando i requisiti di business, emergono due vincoli chiave per la corretta progettazione del job:

  1. I record delle spedizioni facente parti della stessa partizione (recapitista-provincia) non devono essere elaborati in parallelo, altrimenti si avrebbero problemi di concorrenza sulla valutazione della capacità.
  2. I record di partizioni diverse possono essere elaborati in parallelo, non avendo dipendenze tra loro.

Mi sono detto quindi: "È il momento di rispolverare Spring Batch!"

Spring Batch e il partizionamento

Con Spring Batch è possibile creare un Master Step che ha il solo compito di dividere il lavoro in più partizioni, assegnando ciascuna a uno Slave Step eseguito in parallelo.

Nel dettaglio:

  1. Un Partitioner calcola le partizioni (nel nostro caso: le chiavi deliveryDriverId##province).
  2. Ogni partizione viene passata a uno Slave Step, che riceve i parametri tramite ExecutionContext.
  3. Gli step vengono eseguiti in parallelo tramite un TaskExecutor.
  4. In ogni Slave Step definiamo:
    • un ItemReader che legge da DynamoDB usando la pk
    • un ItemWriter che processa i record sequenzialmente (chunk-based)

Di seguito viene mostrato un diagramma del funzionamento del Master Step:

Block Image

Di seguito viene mostrato anche un diagramma del funzionamento dello Slave Step:

Block Image

Prerequisiti

  • Java 21 o superiori.
  • Installazione di Podman o Docker nella macchina per eseguire il container di LocalStack, che permette di utilizzare DynamoDB in locale.

L'applicazione Spring Batch

Puoi trovare tutto il codice sorgente dell'applicazione sul mio repository GitHub.

L'applicazione utilizza Java 21 e Spring Boot 3 con Spring Batch. Possiamo creare lo scheletro del progetto eseguendo la seguente cURL:

curl https://start.spring.io/starter.zip -d groupId=com.vincenzoracca \
      -d artifactId=spring-batch-partitioner \
      -d name=spring-batch-partitioner \
      -d packageName=com.vincenzoracca.springbatchpartitioner \
      -d dependencies=batch,actuator,web,distributed-tracing,lombok,postgresql \
      -d javaVersion=21 -d bootVersion=3.4.4 -d type=maven-project -o spring-batch-partitioner.zip

Utilizziamo anche le dipendenze di Spring MVC (Web), Spring Boot Actuator e Micrometer Tracing (Distributed Tracing) in modo tale da poter monitorare l'esecuzione del batch aggiungere il traceId e lo spanId ai log del Job. Tutti gli Slave Step appartenenti allo stesso Master Step avranno lo stesso TraceID ma diverso SpanID.
Aggiungiamo poi la dipendenza JDBC di Postgres in quanto utilizziamo questo database per memorizzare i metadati delle esecuzioni dei Job di Spring Batch.

Inoltre centralizziamo le versioni delle dipendenze di Spring Cloud AWS (che si servono per utilizzare DynamoDB), aggiungendo questo pezzo di codice al pom.xml:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws-dependencies</artifactId>
            <version>3.3.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Infine, aggiungiamo la dipendenza di Spring Cloud AWS per DyanamoDB:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-dynamodb</artifactId>
</dependency>

Step 1: creazione delle entità

Creiamo il package middleware.db.entity e scriviamo le classi che rappresentano le entità descritte a inizio articolo:

// DeliveryDriverCapacity.java
@DynamoDbBean
@Data
public class DeliveryDriverCapacity {

    private String pk; // deliveryDriverId##province
    private String deliveryDriverId;
    private String province;
    private long capacity;
    private long usedCapacity;

    @DynamoDbPartitionKey
    public String getPk() {
        return pk;
    }
}

// PaperDelivery.java
@DynamoDbBean
@Data
public class PaperDelivery {

    private String pk; // deliveryDriverId##province
    private Instant createdAt;
    private String deliveryDriverId;
    private String province;
    private String requestId;

    @DynamoDbPartitionKey
    public String getPk() {
        return pk;
    }

    @DynamoDbSortKey
    public Instant getCreatedAt() {
        return createdAt;
    }
}

Step 2: creazione del job runner

Scriviamo la classe che fa partire il batch. Creiamo il package job.runner con la seguente classe:

@Component
@RequiredArgsConstructor
public class PaperDeliveryJobRunner {

    private final Job job;

    private final JobLauncher jobLauncher;

    @Scheduled(cron = "${paper-delivery-cron}")
    public void run() throws Exception {
        var pks = ExternalService.buildPks();
        JobParameters jobParameters = new JobParametersBuilder()
                .addJobParameter("pks", pks, String.class)
                .addJobParameter("createAt", Instant.now().toString(), String.class)
                .toJobParameters();

        jobLauncher.run(job, jobParameters);
    }

}

Il metodo run viene eseguito più volte secondo una cron expression, contenuta nella annotazione @Scheduled.
Questo metodo, invoca un servizio esterno che restituisce una stringa contenente tutte le coppie deliveryDriverId-province, nel seguente formato:
delivery-driver-1##province1,deliveryd-river-2##province2....

Una volte recuperata questa stringa, questa viene passata come parametro del Job. Infine, il job viene avviato.

Step 3: creazione del partitioner utilizzato dal Master Step

Creiamo la classe partitioner utilizzata dal Master Step, che prende in input tutte le chiavi nel formato deliveryDriverId##province e per ogni chiave crea una partizione.

public class PaperDeliveryPartitioner implements Partitioner {

    private final List<String> pks;

    public PaperDeliveryPartitioner(List<String> pks) {
        this.pks = pks;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> partitions = HashMap.newHashMap(pks.size());

        for (int i = 0; i < pks.size(); i++) {
            ExecutionContext context = new ExecutionContext();
            context.putString("pk", pks.get(i));
            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Il parametro gridSize è uguale alla dimensione della lista pks, per cui possiamo usare lui o pks.size() indifferentemente.

Step 4: creazione dell'item reader utilizzato dallo Slave Step

Creiamo il package job.reader e scriviamo la classe ItemReader, che dato in input un pk (cioè una stringa deliveryDriverId##province), recuperi da DynamoDB tutte le spedizioni associate a quella coppia:

@RequiredArgsConstructor
@Slf4j
public class PaperDeliveryReader implements ItemReader<PaperDelivery>, StepExecutionListener {

    private final DynamoDbTemplate dynamoDbTemplate;
    private Iterator<PaperDelivery> iterator;


    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext context = stepExecution.getExecutionContext();
        var pk = context.getString("pk");
        this.iterator = loadData(pk).iterator();
        log.info("Reading paper deliveries with pk {}", pk);
    }

    private List<PaperDelivery> loadData(String pk) {
        QueryEnhancedRequest query = QueryEnhancedRequest.builder()
                .queryConditional(QueryConditional.keyEqualTo(k -> k.partitionValue(pk))).build();
        PageIterable<PaperDelivery> pageIterable = dynamoDbTemplate.query(query, PaperDelivery.class);
        return StreamSupport.stream(pageIterable.items().spliterator(), false).toList();
    }

    @Override
    public PaperDelivery read() {
        return (iterator != null && iterator.hasNext()) ? iterator.next() : null;
    }
}

Step 5: creazione dell'item writer utilizzato dallo Slave Step

L'ItemWriter prende le spedizioni associata ad una stessa partizione deliveryDriverId##province ed effettua la logica sulla capacità. Creiamo il package job.writer e scriviamo la seguente classe:

@Slf4j
@RequiredArgsConstructor
public class LogWriter implements ItemWriter<PaperDelivery> {

    private final DynamoDbTemplate dynamoDbTemplate;

    @Override
    public void write(Chunk<? extends PaperDelivery> chunk) {
        String pk = chunk.getItems().get(0).getPk();
        DeliveryDriverCapacity deliveryDriverCapacity = dynamoDbTemplate.load(Key.builder()
                .partitionValue(pk)
                .build(), DeliveryDriverCapacity.class);

        chunk.forEach(paper -> {
            if (deliveryDriverCapacity != null &&
                    deliveryDriverCapacity.getUsedCapacity() < deliveryDriverCapacity.getCapacity()) {

                deliveryDriverCapacity.setUsedCapacity(deliveryDriverCapacity.getUsedCapacity() + 1);
                log.info("Paper Delivery DONE: {}",paper);
                dynamoDbTemplate.delete(paper);
            } else {
                log.warn("PaperDelivery DISCARDED because there is no delivery driver capacity: {}, {}", 
                        paper.getRequestId(), paper.getPk());
            }
        });

        dynamoDbTemplate.save(deliveryDriverCapacity);
    }

}

Nota che il metodo write prende come parametro un Chuck delle spedizioni. Questo significa che possiamo decidere di dividere in più chuck le spedizioni di una stessa partizione, in modo tale da alleggerire l'elaborazione. Tuttavia, per il nostro caso d'suo, è importante che i chuck non vengano eseguiti in parallelo, per evitare problemi di inconsistenza sulla valutazione della capacità.

Step 6: creazione del job completo

Infine, scriviamo il job completo, formato da due step, Master Step e Slave Step. Creiamo il package job.config e scriviamo la seguente classe:

@Configuration
@EnableBatchProcessing
@Slf4j
@RequiredArgsConstructor
public class PaperDeliveryJobConfig {


    private final DynamoDbTemplate dynamoDbTemplate;
    private final PlatformTransactionManager transactionManager;
    private final JobRepository jobRepository;

    // Used to run Slave Steps in parallel
    private TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("part-thread-");
        executor.initialize();
        return executor;
    }

    @Bean
    @StepScope
    public PaperDeliveryReader reader() {
        return new PaperDeliveryReader(dynamoDbTemplate);
    }

    @Bean
    public ItemWriter<PaperDelivery> writer() {
        return new LogWriter(dynamoDbTemplate);
    }


    @Bean
    public Step slaveStep() {
        return new StepBuilder("slaveStep", jobRepository)
                .<PaperDelivery, PaperDelivery>chunk(10, transactionManager)
                .reader(reader())
                .writer(writer())
                .build();
    }

    @Bean
    @JobScope
    public Step masterStep(@Value("#{jobParameters[pks]}") String pkParam) {

        List<String> pks = Arrays.asList(pkParam.split(","));

        return new StepBuilder("masterStep", jobRepository)
                .partitioner("slaveStep", new PaperDeliveryPartitioner(pks))
                .step(slaveStep())
                .taskExecutor(taskExecutor())
                .gridSize(pks.size())
                .build();
    }

    @Bean
    public Job paperDeliveryJob(Step masterStep) {
        return new JobBuilder("paperDeliveryJob", jobRepository)
                .start(masterStep)
                .build();
    }


}

In questa classe è rilevante la configurazione del Master Step. Questo prende in input il job parameter pks passato dal runner, lo splitta per ricavare le varie coppie deliveryDriverId##province e infine le passa al partitioner. Qui puoi vedere chiaramente che il gridSize viene settato col valore pks.size(). Inoltre, a questo step viene assegnato un taskExecutor per parallelizzare l'esecuzione degli Step Slave.

Step 7: inizializziamo il database

Allo startup dell'applicazione, inseriamo 2 spedizioni per ogni coppia deliveryDriverId##province. Inoltre, inizializziamo tutte le capacità a 2, tranne per le coppie 1##RM e 2##NA, che hanno capacità 1. In questo modo, per queste coppie possiamo simulare il caso di capacità terminata:

@SpringBootApplication
@EnableScheduling
@Slf4j
public class SpringBatchAwsApplication {


	public static void main(String[] args) {
		SpringApplication.run(SpringBatchAwsApplication.class, args);
	}


	@Bean
	CommandLineRunner commandLineRunner(DynamoDbTemplate dynamoDbTemplate) {
		return args -> {
			cleanDb(dynamoDbTemplate, PaperDelivery.class);
			cleanDb(dynamoDbTemplate, DeliveryDriverCapacity.class);

			String[] pks = ExternalService.buildPks().split(",");
			int numberOfRequest = 2;
			log.info("Initialization {} requestId for every {} pair deliveryDriverId##Province", numberOfRequest, pks.length);
			Stream.of(pks).parallel().forEach(pk -> {
				String[] deliveryDriverProvince = pk.split("##");
				var deliveryDriverId = deliveryDriverProvince[0];
				var province = deliveryDriverProvince[1];
				DeliveryDriverCapacity deliveryDriverCapacity = buildDeliveryDriverCapacity(pk, deliveryDriverId, province);
				dynamoDbTemplate.save(deliveryDriverCapacity);
				for(int i = 0; i < numberOfRequest; i++) {
					PaperDelivery paperDelivery = buildPaperDelivery(pk, deliveryDriverId, province);
					dynamoDbTemplate.save(paperDelivery);
				}
			});

		};
	}

	private static PaperDelivery buildPaperDelivery(String pk, String deliveryDriverId, String province) {
		PaperDelivery paperDelivery = new PaperDelivery();
		paperDelivery.setPk(pk);
		paperDelivery.setCreatedAt(Instant.now());
		paperDelivery.setRequestId(UUID.randomUUID().toString());
		paperDelivery.setDeliveryDriverId(deliveryDriverId);
		paperDelivery.setProvince(province);
		return paperDelivery;
	}

	private static DeliveryDriverCapacity buildDeliveryDriverCapacity(String pk, String deliveryDriverId, String province) {
		DeliveryDriverCapacity deliveryDriverCapacity = new DeliveryDriverCapacity();
		deliveryDriverCapacity.setPk(pk);
		deliveryDriverCapacity.setDeliveryDriverId(deliveryDriverId);
		deliveryDriverCapacity.setProvince(province);
		if(pk.equals("1##RM") || pk.equals("2##NA")) {
			deliveryDriverCapacity.setCapacity(1L);
		}
		else {
			deliveryDriverCapacity.setCapacity(2L);
		}
		deliveryDriverCapacity.setUsedCapacity(0L);
		return deliveryDriverCapacity;
	}

	private <T> void cleanDb(DynamoDbTemplate dynamoDbTemplate, Class<T> clazz) {
		PageIterable<T> pages = dynamoDbTemplate.scanAll(clazz);
		StreamSupport.stream(pages.items().spliterator(), false)
				.forEach(dynamoDbTemplate::delete);
	}



}

Step 8: avviamo l'applicazione

Nel progetto troverai un file compose.yml che permette di eseguire Postgres e DynamoDB in locale. Accertati che Docker o Podman siano avviati, dopodichè apri un terminale dalla root del progetto ed esegui il comando:

docker compose up -d # Docker

e poi:

./mvnw clean spring-boot:run

Il job partità al secondo 0 di ogni minuto, poiché nel file application.properties abbiamo:

paper-delivery-cron=0 */1 * * * *

Analizzando i log, vedremo che gli Slave step partiranno in parallelo. Inoltre, tutte le spedizioni di uno stesso Step Slave, cioè, di una stessa partizione, avranno lo stesso thread, lo stesso TraceID e lo stesso SpanID:

[  part-thread-4] [67fbd9a4bc4011ed462226cf7d5187fb-462226cf7d5187fb]...: 
         Reading paper deliveries with pk 2##NA
         
[  part-thread-4] [67fbd9a4bc4011ed462226cf7d5187fb-462226cf7d5187fb]...: 
         Paper Delivery DONE: PaperDelivery(pk=2##NA, createdAt=2025-04-13T15:34:45.630071Z, deliveryDriverId=2, province=NA, requestId=8779789a-7e5e-4c84-ad33-4a152d0c4e4c)

[  part-thread-4] [67fbd9a4bc4011ed462226cf7d5187fb-462226cf7d5187fb]...: 
         PaperDelivery DISCARDED because there is no delivery driver capacity: 1e188367-4800-4144-86a6-a2477cc4c4fc, 2##NA

Conclusioni

In questo articolo abbiamo visto come risolvere un problema comune, cioè quello di creare batch con un esecuzione partizionata. Utilizzando Spring Batch, è possibile creare un Master Step che abbia la responsabilità di partizionare ed eseguire Slave Step, in modo molto semplice e parallelizzabile.

Altri articoli su Spring: Spring.

Se vuoi creare API sicure e scalabili con Spring Boot, acquista il mio libro Spring Boot 3 API Mastery su Amazon!

Altri libri consigliati su Spring, Docker e Kubernetes: