B3 Propagation from Kafka Message
See original GitHub issueHey 😃
Could someone maybe answer me what I’m doing wrong? I’m running a local Kafka where I’m producing and consuming messages. I intentionally produce messages without injecting a Span but setting a header in a single B3 string format.
In my consumer I want to handle the message and extracting that TraceId from the header. So far so good this is working, but when I want to create a new Span from there with the context/TraceId as a parent it’s not working as expected.
Here’s the Code:
import { Kafka, KafkaMessage } from 'kafkajs';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { ConsoleSpanExporter, SimpleSpanProcessor, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import * as opentelemetryapi from '@opentelemetry/api';
import { B3Propagator } from '@opentelemetry/propagator-b3';
import { NodeTracerProvider } from '@opentelemetry/node';
opentelemetryapi.propagation.setGlobalPropagator(new B3Propagator());
const options = {
serviceName: 'test123',
endpoint: 'http://localhost:14268/api/traces',
};
const traceProvider = new NodeTracerProvider();
registerInstrumentations({
tracerProvider: traceProvider,
instrumentations: [getNodeAutoInstrumentations()],
});
const exporter = new JaegerExporter(options);
traceProvider.addSpanProcessor(new BatchSpanProcessor(exporter) as any);
traceProvider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()) as any);
traceProvider.register();
const tracer = opentelemetryapi.trace.getTracer('test');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092'],
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'test-group' });
const handleMessage = (message: KafkaMessage) => {
const b3header = { b3: message.headers.b3.toString() };
const b3propagation = opentelemetryapi.propagation.extract(
opentelemetryapi.context.active(),
b3header,
opentelemetryapi.defaultTextMapGetter
);
const extractedSpan = opentelemetryapi.trace.getSpan(b3propagation);
const ctx = opentelemetryapi.trace.setSpan(opentelemetryapi.context.active(), extractedSpan);
const span = tracer.startSpan('handleMessage', undefined, ctx);
console.log(span);
span.end();
};
const run = async () => {
// Producing
await producer.connect();
setInterval(async () => {
await producer.send({
topic: 'test-topic',
messages: [
{
value: 'Hello KafkaJS user!',
headers: {
b3: 'a2202b400feeec6e-c86b4cce85426774-0',
},
},
],
});
}, 5000);
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }: { message: KafkaMessage }) => {
handleMessage(message);
},
});
};
run().catch(console.error);
Tracing Init:
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base';
import * as opentelemetry from '@opentelemetry/sdk-node';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
const traceExporter = new ConsoleSpanExporter();
const sdk = new opentelemetry.NodeSDK({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'my-service',
}),
traceExporter,
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start()
.then(() => console.log('Tracing initialized'))
.catch((error) => console.log('Error initializing tracing', error));
The span which I’m console logging looks like:
NonRecordingSpan {
_spanContext: {
traceId: '0000000000000000a2202b400feeec6e',
spanId: '876041614bb8e8c9',
traceFlags: 0,
traceState: undefined
}
}
but I’m expecting something like this or so (I just want to have the extracted traceId as main trace):
{
traceId: '0000000000000000a2202b400feeec6e',
parentId: 'c86b4cce85426774',
name: 'handleMessage',
id: '876041614bb8e8c9',
kind: 0,
timestamp: 1636643039824327,
duration: 3558,
attributes: {},
status: { code: 0 },
events: []
}
I would appreciate every help! Thank you in advance! 😃
Issue Analytics
- State:
- Created 2 years ago
- Comments:10 (5 by maintainers)
Top Results From Across the Web
b3-propagation vs kafkacat - compare differences and reviews ...
To communicate with Kafka, you can use Kafkacat, a command-line tool that allows to produce and consume Kafka messages using a very simple...
Read more >Spring Cloud Sleuth
Sleuth includes default logic to join a trace across HTTP or messaging boundaries. For example, HTTP propagation works over Zipkin-compatible request headers.
Read more >B3SingleFormat (Brave 5.6.0 API) - javadoc.io
This format corresponds to the propagation key "b3" (or "B3"), which delimits ... For example, message consumers always do work in child spans,...
Read more >Sleuth tracing is not working for transactional Kafka producers
Why tracing information do not propagate over kafka messages when Spring Sleuth is in the classpath? 0 · Some Kafka producers that are ......
Read more >Distributed Tracing for Kafka with OpenTelemetry - Confluent
... travels through your Kafka producer, queue, and consumer. First, we will learn how context propagation works in OpenTelemetry with W3C and B3...
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
Hi @vmarchaud Ohh what a stupid mistake, I completely didn’t think of that sampling 😦 Thank you very very much for supporting and debugging that with/for me 😃 I very appreciate this! 😃 Thanks!
@dennismeister93 I finally had the time to look into your issue and thanks a lot for the reproduction repo, thats helped me a lot !
Your problem is that the span is getting sampled, by default SDKs are using this sampler which don’t record the span is the parent is remote and the tracestate flag indicate that the parent span was sampled. You can look into this: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#sampling and this https://www.w3.org/TR/trace-context/#sampled-flag for more informations,
In your repo, if you swap the
0by a1on the end of the B3 header, you’ll get a real span 😃