Validating Spring Kafka Payloads

See original GitHub issue

Per Gary and https://stackoverflow.com/questions/52526886/validating-spring-kafka-payloads.

I am trying to set up a service that has both a REST (POST) endpoint and a Kafka endpoint, both of which should take a JSON representation of the request object (let’s call it Foo). I would want to make sure that the Foo object is valid (via JSR-303 or whatever). So Foo might look like:

public class Foo {
    @Max(10)
    private int bar;

    // Getter and setter boilerplate
}

Setting up the REST endpoint is easy:

@PostMapping(value = "/", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> restEndpoint(@Valid @RequestBody Foo foo) {
    // Do stuff here
}

and if I POST, { “bar”: 9 } it processes the request, but if I post: { “bar”: 99 } I get a BAD REQUEST. All good so far!

The Kafka endpoint is easy to create (along with adding a StringJsonMessageConverter() to my KafkaListenerContainerFactory so that I get JSON->Object conversion:

@KafkaListener(topics = "fooTopic")
public void kafkaEndpoint(@Valid @Payload Foo foo) {
    // I shouldn't get here with an invalid object!!!
    logger.debug("Successfully processed the object" + foo);

    // But just to make sure, let's see if hand-validating it works
    Validator validator = localValidatorFactoryBean.getValidator();
    Set<ConstraintViolation<SlackMessage>> errors = validator.validate(foo);
    if (errors.size() > 0) {
        logger.debug("But there were validation errors!" + errors);
    }
}

But no matter what I try, I can still pass invalid requests in and they process without error.

I’ve tried both @Valid and @Validated. I’ve tried adding a MethodValidationPostProcessor bean. I’ve tried adding a Validator to the KafkaListenerEndpointRegistrar (a la the EnableKafka javadoc):

@Configuration
public class MiscellaneousConfiguration implements KafkaListenerConfigurer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    LocalValidatorFactoryBean validatorFactory;

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        logger.debug("Configuring " + registrar);
        registrar.setMessageHandlerMethodFactory(kafkaHandlerMethodFactory());
    }

    @Bean
    public MessageHandlerMethodFactory kafkaHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(validatorFactory);
        return factory;
    }
}

I’ve now spent a few days on this, and I’m running out of other ideas. Is this even possible (without writing validation into every one of my kakfa endpoints)?

Gary responded: “The infrastructure currently does not pass a Validator into the payload argument resolver. Please open an issue on GitHub.”

Issue Analytics

  • State:closed
  • Created 5 years ago
  • Comments:9 (8 by maintainers)

github_iconTop GitHub Comments

1reaction
garyrussellcommented, Oct 1, 2018

I had forgotten about that Javadoc; thanks for reminding us.

That said, it works fine for me…

@SpringBootApplication
public class So52526886Application implements KafkaListenerConfigurer {

	public static void main(String[] args) {
		SpringApplication.run(So52526886Application.class, args);
	}

	@KafkaListener(id = "soso52526886a", topics = "so52526886")
	public void listen(@Valid @Payload Foo in) {
		System.out.println(in);
	}

	@Bean
	public StringJsonMessageConverter converter() {
		return new StringJsonMessageConverter();
	}

	@Bean
	public ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> {
			template.send("so52526886", "{\"bar\":42}");
		};
	}

	@Autowired
	LocalValidatorFactoryBean validatorFactory;

	@Override
	public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(kafkaHandlerMethodFactory());
	}

	@Bean
	public MessageHandlerMethodFactory kafkaHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(validatorFactory);
		return factory;
	}

	public static class Foo {

		@Max(10)
		private int bar;

		public int getBar() {
			return this.bar;
		}

		public void setBar(int bar) {
			this.bar = bar;
		}

		@Override
		public String toString() {
			return "Foo [bar=" + this.bar + "]";
		}

	}

}

and

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
...

Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.example.So52526886Application.listen(com.example.So52526886Application$Foo): 1 error(s): [Field error in object ‘in’ on field ‘bar’: rejected value [42]; codes [Max.in.bar,Max.bar,Max.int,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [in.bar,bar]; arguments []; default message [bar],10]; default message [must be less than or equal to 10]]

Tested with Boot 2.0.5, spring-kafka 2.1.10.

0reactions
garyrussellcommented, Oct 12, 2018

Yes; I agree; I decided to do just that, and not auto-detect, due to https://jira.spring.io/browse/SPR-17319

PR coming soon…

Read more comments on GitHub >

github_iconTop Results From Across the Web

How to effectively validate kafka listener payloads
1 Answer 1 ... @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = " ...
Read more >
Spring for Apache Kafka
Starting with version 2.5.11, validation now works on payloads for @KafkaHandler methods in a class-level listener. See @KafkaListener on a Class.
Read more >
Chapter 7. Validating schemas using Kafka serializers ...
Kafka consumer applications use deserializers to validate that messages have been serialized using the correct schema, based on a specific schema ID.
Read more >
Do I have to use @Payload spring annotation to read Kafka ...
To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false. If it is a tombstone...
Read more >
Kafka Dead Letter Publishing - JDriven Blog
Notice how we are using @KafkaListener @Payload validation by annotating our order payload as @Validated . We only need the following bit of ......
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found