Categorias
Java Java EE Programação

Obtenção de grande volume de dados em Jobs

Sempre que é feito um job,uma das preocupações do lado da aplicação é em como não levar todos os registros necessários para a memória. Existem algumas maneiras para se fazer isso, sendo que as duas mais comuns que vejo são queries limitadas e queries paginadas. Além dessas duas formas existe uma terceira que é limitando a quantidade de registros que o banco retorna através da opção “fetch size” no driver.

Limitação na query

Esse é um dos jeitos mais utilizados que vejo. O uso consiste em rodar a mesma query diversas vezes, retornando uma quantidade limitada de registros e, ao processar o registro, fazer um update no mesmo para que esse registro não retorne novamente na query. O job irá executar a query diversas vezes, até não ser retornado mais registros, o que indica que o job terminou o seu processamento.

Segue um exemplo de como ficaria o job:

int processed = 0;
List<OutboxEntity> toProcess = outboxRepository.retrieveLimited(quantity);
while (!toProcess.isEmpty()) {
	for (OutboxEntity item : toProcess) {
		// Do some processing
		logger.info("Processing item {}", item.getId());

		// Mark as processed
		item.setProcessed(true);

		processed++;
	}

	// Flush pending changes
	entityManager.flush();
	// Clear memory
	entityManager.clear();

	logger.info("Processed {}", processed);

	toProcess = outboxRepository.retrieveLimited(quantity);
}
return processed;

E a query:

public List<OutboxEntity> retrieveLimited(int quantity) {
	return entityManager.createQuery("SELECT o FROM OutboxEntity o WHERE o.processed = FALSE", OutboxEntity.class)
			.setMaxResults(quantity)
			.getResultList();
}

Paginação na query

Essa maneira é muito similar com a primeira abordagem, exceto pelo fato de que não é necessário atualizar o registro para que o mesmo não retorne na query. Se o registro for atualizado e não retornar mais, o job não irá processar todos os registros pois os registros ficarão “mudando” de página conforme o processamento vai ocorrendo.

Segue um exemplo de como ficaria o job:

int processed = 0;

int pageNumber = 1;
Date creationDate = new Date();
List<OutboxEntity> toProcess = outboxRepository.retrievePaging(pageNumber++, quantity, creationDate);
while (!toProcess.isEmpty()) {
	for (OutboxEntity item : toProcess) {
		// Do some processing
		logger.info("Processing item {}", item.getId());

		// Mark as processed
		item.setProcessed(true);

		processed++;
	}

	// Flush pending changes
	entityManager.flush();
	// Clear memory
	entityManager.clear();

	logger.info("Processed {}", processed);

	toProcess = outboxRepository.retrievePaging(pageNumber++, quantity, creationDate);
}
return processed;

E a query:

public List<OutboxEntity> retrievePaging(int pageNumber, int quantity, Date creationDate) {
	return entityManager.createQuery("SELECT o FROM OutboxEntity o WHERE o.retries < 3 AND o.creationDate <= :creationDate ORDER BY o.id", OutboxEntity.class)
			.setParameter("creationDate", creationDate)
			.setFirstResult((pageNumber - 1) * quantity)
			.setMaxResults(quantity)
			.getResultList();
}

Fetch size

Essa é a forma que menos vejo ser utilizada. Diferentemente das outras duas abordagens, essa só precisa executar a query no banco de dados apenas uma vez. Para garantir que o banco não irá retornar todos os registros de uma vez para aplicação, fazendo com que a memória da mesma se esgote, é necessário setar a propriedade fetch size no driver que faz a comunicação com o banco. Isso pode ser feito com um opção global, ou pode ser ajustada conforme a query que será executada.

Existem diversas maneiras de fazer isso. Utilizando JDBC puro, basta chamar o método setFetchSize do Statement com a quantidade de registros que devem ser trazidos por vez.

Utilizando Hibernate, deve ser utilizado o método setFetchSize da Query do Hibernate e, além disso, utilizar o método scroll que retorna uma instância da classe ScrollableResults. Essa classe se comporta de forma parecida com a classe ResultSet do JDBC.

No JPA 2.2, é possível obter um Stream através do método getResultStream, mas para ajustar o fetch size especificamente para a query sendo executada, é necessário utilizar o hint QueryHints.HINT_FETCH_SIZE na query.

Segue um exemplo de como ficaria o job com ScrollableResults do Hibernate:

int processed = 0;

Date creationDate = new Date();
ScrollableResults toProcess = outboxRepository.retrieveScroll(fetchSize, creationDate);
while (toProcess.next()) {
	OutboxEntity item = (OutboxEntity) toProcess.get(0);
	// Do some processing
	logger.info("Processing item {}", item.getId());

	// Mark as processed
	item.setRetries(item.getRetries() + 1);
	item.setProcessed(true);

	processed++;

	if (processed % quantity == 0) {
		// Flush pending changes
		entityManager.flush();
		// Clear memory
		entityManager.clear();

		logger.info("Processed {}", processed);
	}
}
logger.info("Processed {}", processed);
return processed;

E a query:

public ScrollableResults retrieveScroll(int fetchSize, Date creationDate) {
	return entityManager.unwrap(Session.class).createQuery("SELECT o FROM OutboxEntity o WHERE o.retries < 3 AND o.creationDate <= :creationDate", OutboxEntity.class)
			.setParameter("creationDate", creationDate)
			.setFetchSize(fetchSize)
			.scroll();
}

Já com JPA 2.2, a implementação do job fica dessa maneira:

int processed = 0;

Date creationDate = new Date();
Iterator<OutboxEntity> iterator = outboxRepository.retrieveStream(fetchSize, creationDate).iterator();
while (iterator.hasNext()) {
	OutboxEntity item = iterator.next();
	// Do some processing
	logger.info("Processing item {}", item.getId());

	// Mark as processed
	item.setRetries(item.getRetries() + 1);
	item.setProcessed(true);

	processed++;

	if (processed % quantity == 0) {
		// Flush pending changes
		entityManager.flush();
		// Clear memory
		entityManager.clear();

		logger.info("Processed {}", processed);
	}
}
logger.info("Processed {}", processed);
return processed;

E da query:

public Stream<OutboxEntity> retrieveStream(int fetchSize, Date creationDate) {
	return entityManager.createQuery("SELECT o FROM OutboxEntity o WHERE o.retries < 3 AND o.creationDate <= :creationDate", OutboxEntity.class)
			.setParameter("creationDate", creationDate)
			.setHint(QueryHints.HINT_FETCH_SIZE, fetchSize)
			.getResultStream();
}

Exemplo real

Em uma das aplicações que trabalhei, havia um job que rodava a todo momento para verificar se alguns registros já haviam sido notificados. Ele rodava a query paginada e processava todo o conjunto de dados toda vez que executava. O plano de execução não era bom e já que ele tinha que processar todos os registros, não havia necessidade de paginar. A solução nesse caso foi trocar o job para obter os dados de uma vez só, utilizando o fetch size para que a memória da aplicação não se esgotasse.

O gráfico do banco de dados durante a alteração da aplicação para que o job rodasse com a query utilizando fetch size:

Como podemos ver, antes da alteração o load do banco de dados estava em aproximadamente 5, sendo que após a alteração o mesmo passou para 3. Além disso, conseguimos ver também uma melhora na CPU livre do banco de dados.

A alteração foi simplesmente a troca da classe org.springframework.batch.item.database.JpaPagingItemReader pela org.springframework.batch.item.database.HibernateCursorItemReader do Spring Batch.

Conclusões

Foram mostradas 3 abordagens, cada uma delas tem vantagens e desvantagens. Na abordagem por limitação, é necessário se preocupar em atualizar o registro processado para que ele não volte na próxima execução da query, entretanto, na paginação é necessário que o registro não seja alterado de forma que ele mude de página ou não seja mais retornado pela query.

Além disso, a principal diferença que vejo é a quantidade de vezes que a query será executada. Na abordagem com fetch size a query só é executada uma única vez. Isso é útil por exemplo em uma query onde o plano de execução é difícil de ser melhorado. No exemplo dado, o plano de execução da query paginada é o seguinte:

Essa query possui 2 ranges scan em colunas diferentes, e o sort por uma outra coluna. Nesse caso, o melhor seria rodar o job com o fetch size pois a query só executaria uma única vez, ao invés de diversas vezes dependendo de quantas páginas forem necessárias para o job executar.

O problema da abordagem com fetch size é que se o job demorar muito para ser executado, no Oracle a aplicação acaba recebendo um erro “ORA-01555: snapshot too old”. Este artigo não tem como intuito resolver esse problema, mas ele pode ser resolvido alterando alguns parâmetros no banco de dados, reduzir o tempo de processamento do job, ou uma terceira abordagem é fazer o commit a cada 1000 itens processados (dessa forma, na próxima execução do job, ele irá retomar o processamento de onde parou, eu não novamente do começo).

Os exemplos mostrados aqui estão em https://github.com/fabionb/artigo_job

Referências

https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm

https://docs.jboss.org/hibernate/orm/3.5/api/org/hibernate/ScrollableResults.html

https://stackoverflow.com/questions/4169669/is-there-are-way-to-scroll-results-with-jpa-hibernate/42412360

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.

css.php