The producer attempted to use a producer id which is not currently assigned to its transactional id

See original GitHub issue

Hello,

I’m adding transaction support for my application. I’m using the chained transaction manager mentioned here, but for simplicity I tested this with only the KafkaTransactionManager (application is written in kotlin, kafka version is 1.0.1 and tested in both Mac and Windows).

  1. Configuration
@Configuration
@EnableKafka
@EnableTransactionManagement
class KafkaProducerConfig {

    @Autowired
    private lateinit var kafkaConfig: KafkaConfig

    @Bean
    fun producerFactory(): ProducerFactory<String, Entity> {
        val configProps = HashMap<String, Any>()

        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaConfig.bootstrap
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java

        val factory = DefaultKafkaProducerFactory<String, Entity> (configProps)
        factory.setTransactionIdPrefix("txId")

        return factory
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, Entity> {
        return KafkaTemplate(producerFactory())
    }

    @Bean
    fun transactionManager(): KafkaTransactionManager<*, *> {

        return KafkaTransactionManager(producerFactory())
    }
}
  1. Implementation
    @Autowired
    private lateinit var kafka: KafkaTemplate<String, Entity>

    @Transactional
    @RequestMapping(value = ["/user"], method = [RequestMethod.POST])
    fun user(@RequestBody user: User): User {

        kafka.send("user_updated", user).get()

        return user
    }

I’m using docker to run an instance of zookeper and another for the kafka broker, and I tried to simulate a container crash (I killed and restart the kafka docker instance. We can see the exact time the connection was broken in logs bellow -> “Connection to node 1 could not be established. Broker may not be available”), and after the kafka service is started I have to call the above endpoint multiple times until the producer exits the error state (exception received is “The producer attempted to use a producer id which is not currently assigned to its transactional id”). I even waited more than 10 minutes before calling the endpoint to see if the producer recovered from it, but it won’t work until I call that endpoint 2/3 times.

Is there any configuration that I’m missing?

Thanks, Pedro

 2018-04-23 15:17:01.548  INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
 	acks = 1
 	batch.size = 16384
 	bootstrap.servers = [http://localhost:9092]
 	buffer.memory = 33554432
 	client.id = 
 	compression.type = none
 	connections.max.idle.ms = 540000
 	enable.idempotence = true
 	interceptor.classes = null
 	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
 	linger.ms = 0
 	max.block.ms = 60000
 	max.in.flight.requests.per.connection = 5
 	max.request.size = 1048576
 	metadata.max.age.ms = 300000
 	metric.reporters = []
 	metrics.num.samples = 2
 	metrics.recording.level = INFO
 	metrics.sample.window.ms = 30000
 	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
 	receive.buffer.bytes = 32768
 	reconnect.backoff.max.ms = 1000
 	reconnect.backoff.ms = 50
 	request.timeout.ms = 30000
 	retries = 0
 	retry.backoff.ms = 100
 	sasl.jaas.config = null
 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
 	sasl.kerberos.min.time.before.relogin = 60000
 	sasl.kerberos.service.name = null
 	sasl.kerberos.ticket.renew.jitter = 0.05
 	sasl.kerberos.ticket.renew.window.factor = 0.8
 	sasl.mechanism = GSSAPI
 	security.protocol = PLAINTEXT
 	send.buffer.bytes = 131072
 	ssl.cipher.suites = null
 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 	ssl.endpoint.identification.algorithm = null
 	ssl.key.password = null
 	ssl.keymanager.algorithm = SunX509
 	ssl.keystore.location = null
 	ssl.keystore.password = null
 	ssl.keystore.type = JKS
 	ssl.protocol = TLS
 	ssl.provider = null
 	ssl.secure.random.implementation = null
 	ssl.trustmanager.algorithm = PKIX
 	ssl.truststore.location = null
 	ssl.truststore.password = null
 	ssl.truststore.type = JKS
 	transaction.timeout.ms = 60000
 	transactional.id = txId0
 	value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
 
 2018-04-23 15:17:01.589  INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txId0] Instantiated a transactional producer.
 2018-04-23 15:17:01.589  INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txId0] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
 2018-04-23 15:17:01.589  INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txId0] Overriding the default acks to all since idempotence is enabled.
 2018-04-23 15:17:01.642  INFO 2239 --- [nio-8010-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
 2018-04-23 15:17:01.642  INFO 2239 --- [nio-8010-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
 2018-04-23 15:17:01.645  INFO 2239 --- [nio-8010-exec-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=txId0] ProducerId set to -1 with epoch -1
 2018-04-23 15:17:01.959  INFO 2239 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=txId0] ProducerId set to 18001 with epoch 0
 2018-04-23 15:17:15.032  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Connection to node 1 could not be established. Broker may not be available.
 2018-04-23 15:17:19.151  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 25 : {user_updated=INVALID_REPLICATION_FACTOR}
 2018-04-23 15:17:19.293  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 26 : {user_updated=LEADER_NOT_AVAILABLE}
 2018-04-23 15:17:19.402  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 27 : {user_updated=LEADER_NOT_AVAILABLE}
 2018-04-23 15:17:19.514  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 28 : {user_updated=LEADER_NOT_AVAILABLE}
 2018-04-23 15:17:19.625  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 29 : {user_updated=LEADER_NOT_AVAILABLE}
 2018-04-23 15:17:19.733  WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 30 : {user_updated=LEADER_NOT_AVAILABLE}
 2018-04-23 15:17:36.906 ERROR 2239 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1, transactionalId=txId0] Aborting producer batches due to fatal error
 
 org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 	at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) [kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.1.jar:na]
 	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
 Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:36.908 ERROR 2239 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='Entity@51dc41f1' to topic user_updated:
 
 org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 	at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) [kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.1.jar:na]
 	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
 Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:37.014 ERROR 2239 --- [nio-8010-exec-5] o.s.k.t.KafkaTransactionManager          : Commit exception overridden by rollback exception
 
 org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 	at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) ~[kafka-clients-1.0.1.jar:na]
 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.1.jar:na]
 	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
 Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:37.025 ERROR 2239 --- [nio-8010-exec-5] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
 
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:44.730 ERROR 2239 --- [nio-8010-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
 
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:52.725 ERROR 2239 --- [nio-8010-exec-9] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
 
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:53.618 ERROR 2239 --- [nio-8010-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
 
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:55.980 ERROR 2239 --- [nio-8010-exec-3] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
 
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:56.909 ERROR 2239 --- [nio-8010-exec-4] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
 
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
 
 2018-04-23 15:17:57.768  INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
 	acks = 1
 	batch.size = 16384
 	bootstrap.servers = [http://localhost:9092]
 	buffer.memory = 33554432
 	client.id = 
 	compression.type = none
 	connections.max.idle.ms = 540000
 	enable.idempotence = true
 	interceptor.classes = null
 	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
 	linger.ms = 0
 	max.block.ms = 60000
 	max.in.flight.requests.per.connection = 5
 	max.request.size = 1048576
 	metadata.max.age.ms = 300000
 	metric.reporters = []
 	metrics.num.samples = 2
 	metrics.recording.level = INFO
 	metrics.sample.window.ms = 30000
 	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
 	receive.buffer.bytes = 32768
 	reconnect.backoff.max.ms = 1000
 	reconnect.backoff.ms = 50
 	request.timeout.ms = 30000
 	retries = 0
 	retry.backoff.ms = 100
 	sasl.jaas.config = null
 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
 	sasl.kerberos.min.time.before.relogin = 60000
 	sasl.kerberos.service.name = null
 	sasl.kerberos.ticket.renew.jitter = 0.05
 	sasl.kerberos.ticket.renew.window.factor = 0.8
 	sasl.mechanism = GSSAPI
 	security.protocol = PLAINTEXT
 	send.buffer.bytes = 131072
 	ssl.cipher.suites = null
 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 	ssl.endpoint.identification.algorithm = null
 	ssl.key.password = null
 	ssl.keymanager.algorithm = SunX509
 	ssl.keystore.location = null
 	ssl.keystore.password = null
 	ssl.keystore.type = JKS
 	ssl.protocol = TLS
 	ssl.provider = null
 	ssl.secure.random.implementation = null
 	ssl.trustmanager.algorithm = PKIX
 	ssl.truststore.location = null
 	ssl.truststore.password = null
 	ssl.truststore.type = JKS
 	transaction.timeout.ms = 60000
 	transactional.id = txId1
 	value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
 
 2018-04-23 15:17:57.772  INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txId1] Instantiated a transactional producer.
 2018-04-23 15:17:57.772  INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txId1] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
 2018-04-23 15:17:57.772  INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txId1] Overriding the default acks to all since idempotence is enabled.
 2018-04-23 15:17:57.780  INFO 2239 --- [nio-8010-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
 2018-04-23 15:17:57.780  INFO 2239 --- [nio-8010-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
 2018-04-23 15:17:58.387  INFO 2239 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=txId1] ProducerId set to 19000 with epoch 0
 2018-04-23 15:17:57.783  INFO 2239 --- [nio-8010-exec-7] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=txId1] ProducerId set to -1 with epoch -1


Issue Analytics

  • State:closed
  • Created 5 years ago
  • Comments:9 (5 by maintainers)

github_iconTop GitHub Comments

2reactions
garyrussellcommented, May 11, 2018

Closing for now, if you don’t mind; we can reopen if it turns out to be a spring-kafka issue.

0reactions
pedrohhcommented, Apr 23, 2018

Ok I edited the code above and moved @Transactional to the service method. Now after two retries it works without throwing back an exception to the user.

Nevertheless if I found why it’s not working with the docker image, I will post back here for future reference.

Read more comments on GitHub >

github_iconTop Results From Across the Web

Facing org.apache.kafka.common.errors ...
The producer attempted to use a producer id which is not currently assigned to its transactional id. It probably ...
Read more >
the producer attempted to use a producer id which ... - You.com
I'm always receiving an exception saying that "The producer attempted to use a producer id which is not currently assigned to its transactional...
Read more >
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException
InvalidPidMappingException : The producer > attempted to use a producer id which is not currently assigned to its > transactional id.
Read more >
spring-projects/spring-kafka - Gitter
InvalidPidMappingException : The producer attempted to use a producer id which is not currently assigned to its transactional id.
Read more >
Why did we choose Kafka - Medium
InvalidPidMappingException : The producer attempted to use a producer id which is not currently assigned to its transactional id.
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found