The producer attempted to use a producer id which is not currently assigned to its transactional id
See original GitHub issueHello,
I’m adding transaction support for my application. I’m using the chained transaction manager mentioned here, but for simplicity I tested this with only the KafkaTransactionManager (application is written in kotlin, kafka version is 1.0.1 and tested in both Mac and Windows).
- Configuration
@Configuration
@EnableKafka
@EnableTransactionManagement
class KafkaProducerConfig {
@Autowired
private lateinit var kafkaConfig: KafkaConfig
@Bean
fun producerFactory(): ProducerFactory<String, Entity> {
val configProps = HashMap<String, Any>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaConfig.bootstrap
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
val factory = DefaultKafkaProducerFactory<String, Entity> (configProps)
factory.setTransactionIdPrefix("txId")
return factory
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Entity> {
return KafkaTemplate(producerFactory())
}
@Bean
fun transactionManager(): KafkaTransactionManager<*, *> {
return KafkaTransactionManager(producerFactory())
}
}
- Implementation
@Autowired
private lateinit var kafka: KafkaTemplate<String, Entity>
@Transactional
@RequestMapping(value = ["/user"], method = [RequestMethod.POST])
fun user(@RequestBody user: User): User {
kafka.send("user_updated", user).get()
return user
}
I’m using docker to run an instance of zookeper and another for the kafka broker, and I tried to simulate a container crash (I killed and restart the kafka docker instance. We can see the exact time the connection was broken in logs bellow -> “Connection to node 1 could not be established. Broker may not be available”), and after the kafka service is started I have to call the above endpoint multiple times until the producer exits the error state (exception received is “The producer attempted to use a producer id which is not currently assigned to its transactional id”). I even waited more than 10 minutes before calling the endpoint to see if the producer recovered from it, but it won’t work until I call that endpoint 2/3 times.
Is there any configuration that I’m missing?
Thanks, Pedro
2018-04-23 15:17:01.548 INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [http://localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = txId0
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2018-04-23 15:17:01.589 INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, transactionalId=txId0] Instantiated a transactional producer.
2018-04-23 15:17:01.589 INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, transactionalId=txId0] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2018-04-23 15:17:01.589 INFO 2239 --- [nio-8010-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, transactionalId=txId0] Overriding the default acks to all since idempotence is enabled.
2018-04-23 15:17:01.642 INFO 2239 --- [nio-8010-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-04-23 15:17:01.642 INFO 2239 --- [nio-8010-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-04-23 15:17:01.645 INFO 2239 --- [nio-8010-exec-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=txId0] ProducerId set to -1 with epoch -1
2018-04-23 15:17:01.959 INFO 2239 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=txId0] ProducerId set to 18001 with epoch 0
2018-04-23 15:17:15.032 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Connection to node 1 could not be established. Broker may not be available.
2018-04-23 15:17:19.151 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 25 : {user_updated=INVALID_REPLICATION_FACTOR}
2018-04-23 15:17:19.293 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 26 : {user_updated=LEADER_NOT_AVAILABLE}
2018-04-23 15:17:19.402 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 27 : {user_updated=LEADER_NOT_AVAILABLE}
2018-04-23 15:17:19.514 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 28 : {user_updated=LEADER_NOT_AVAILABLE}
2018-04-23 15:17:19.625 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 29 : {user_updated=LEADER_NOT_AVAILABLE}
2018-04-23 15:17:19.733 WARN 2239 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=txId0] Error while fetching metadata with correlation id 30 : {user_updated=LEADER_NOT_AVAILABLE}
2018-04-23 15:17:36.906 ERROR 2239 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1, transactionalId=txId0] Aborting producer batches due to fatal error
org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) [kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.1.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:36.908 ERROR 2239 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='Entity@51dc41f1' to topic user_updated:
org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) [kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.1.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:37.014 ERROR 2239 --- [nio-8010-exec-5] o.s.k.t.KafkaTransactionManager : Commit exception overridden by rollback exception
org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.1.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:37.025 ERROR 2239 --- [nio-8010-exec-5] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:44.730 ERROR 2239 --- [nio-8010-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:52.725 ERROR 2239 --- [nio-8010-exec-9] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:53.618 ERROR 2239 --- [nio-8010-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:55.980 ERROR 2239 --- [nio-8010-exec-3] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:56.909 ERROR 2239 --- [nio-8010-exec-4] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
2018-04-23 15:17:57.768 INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [http://localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = txId1
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2018-04-23 15:17:57.772 INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, transactionalId=txId1] Instantiated a transactional producer.
2018-04-23 15:17:57.772 INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, transactionalId=txId1] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2018-04-23 15:17:57.772 INFO 2239 --- [nio-8010-exec-7] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, transactionalId=txId1] Overriding the default acks to all since idempotence is enabled.
2018-04-23 15:17:57.780 INFO 2239 --- [nio-8010-exec-7] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-04-23 15:17:57.780 INFO 2239 --- [nio-8010-exec-7] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-04-23 15:17:58.387 INFO 2239 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=txId1] ProducerId set to 19000 with epoch 0
2018-04-23 15:17:57.783 INFO 2239 --- [nio-8010-exec-7] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=txId1] ProducerId set to -1 with epoch -1
Issue Analytics
- State:
- Created 5 years ago
- Comments:9 (5 by maintainers)
Top Related StackOverflow Question
Closing for now, if you don’t mind; we can reopen if it turns out to be a spring-kafka issue.
Ok I edited the code above and moved
@Transactionalto the service method. Now after two retries it works without throwing back an exception to the user.Nevertheless if I found why it’s not working with the docker image, I will post back here for future reference.