B3 Propagation from Kafka Message

See original GitHub issue

Hey 😃

Could someone maybe answer me what I’m doing wrong? I’m running a local Kafka where I’m producing and consuming messages. I intentionally produce messages without injecting a Span but setting a header in a single B3 string format.

In my consumer I want to handle the message and extracting that TraceId from the header. So far so good this is working, but when I want to create a new Span from there with the context/TraceId as a parent it’s not working as expected.

Here’s the Code:

import { Kafka, KafkaMessage } from 'kafkajs';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { ConsoleSpanExporter, SimpleSpanProcessor, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import * as opentelemetryapi from '@opentelemetry/api';
import { B3Propagator } from '@opentelemetry/propagator-b3';
import { NodeTracerProvider } from '@opentelemetry/node';

opentelemetryapi.propagation.setGlobalPropagator(new B3Propagator());

const options = {
    serviceName: 'test123',
    endpoint: 'http://localhost:14268/api/traces',
};

const traceProvider = new NodeTracerProvider();
registerInstrumentations({
    tracerProvider: traceProvider,
    instrumentations: [getNodeAutoInstrumentations()],
});

const exporter = new JaegerExporter(options);

traceProvider.addSpanProcessor(new BatchSpanProcessor(exporter) as any);
traceProvider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()) as any);
traceProvider.register();

const tracer = opentelemetryapi.trace.getTracer('test');
const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:29092'],
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'test-group' });

const handleMessage = (message: KafkaMessage) => {
    const b3header = { b3: message.headers.b3.toString() };
    const b3propagation = opentelemetryapi.propagation.extract(
        opentelemetryapi.context.active(),
        b3header,
        opentelemetryapi.defaultTextMapGetter
    );

    const extractedSpan = opentelemetryapi.trace.getSpan(b3propagation);
    const ctx = opentelemetryapi.trace.setSpan(opentelemetryapi.context.active(), extractedSpan);

    const span = tracer.startSpan('handleMessage', undefined, ctx);

    console.log(span);

    span.end();
};

const run = async () => {
    // Producing
    await producer.connect();
    setInterval(async () => {
        await producer.send({
            topic: 'test-topic',
            messages: [
                {
                    value: 'Hello KafkaJS user!',
                    headers: {
                        b3: 'a2202b400feeec6e-c86b4cce85426774-0',
                    },
                },
            ],
        });
    }, 5000);

    // Consuming
    await consumer.connect();
    await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

    await consumer.run({
        eachMessage: async ({ message }: { message: KafkaMessage }) => {
            handleMessage(message);
        },
    });
};

run().catch(console.error);

Tracing Init:

import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base';
import * as opentelemetry from '@opentelemetry/sdk-node';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';

const traceExporter = new ConsoleSpanExporter();
const sdk = new opentelemetry.NodeSDK({
    resource: new Resource({
        [SemanticResourceAttributes.SERVICE_NAME]: 'my-service',
    }),
    traceExporter,
    instrumentations: [getNodeAutoInstrumentations()],
});

sdk.start()
    .then(() => console.log('Tracing initialized'))
    .catch((error) => console.log('Error initializing tracing', error));

The span which I’m console logging looks like:

NonRecordingSpan {
  _spanContext: {
    traceId: '0000000000000000a2202b400feeec6e',
    spanId: '876041614bb8e8c9',
    traceFlags: 0,
    traceState: undefined
  }
}

but I’m expecting something like this or so (I just want to have the extracted traceId as main trace):

{
  traceId: '0000000000000000a2202b400feeec6e',
  parentId: 'c86b4cce85426774',
  name: 'handleMessage',
  id: '876041614bb8e8c9',
  kind: 0,
  timestamp: 1636643039824327,
  duration: 3558,
  attributes: {},
  status: { code: 0 },
  events: []
}

I would appreciate every help! Thank you in advance! 😃

Issue Analytics

  • State:closed
  • Created 2 years ago
  • Comments:10 (5 by maintainers)

github_iconTop GitHub Comments

1reaction
dennismeister93commented, Nov 30, 2021

Hi @vmarchaud Ohh what a stupid mistake, I completely didn’t think of that sampling 😦 Thank you very very much for supporting and debugging that with/for me 😃 I very appreciate this! 😃 Thanks!

0reactions
vmarchaudcommented, Nov 27, 2021

@dennismeister93 I finally had the time to look into your issue and thanks a lot for the reproduction repo, thats helped me a lot !

Your problem is that the span is getting sampled, by default SDKs are using this sampler which don’t record the span is the parent is remote and the tracestate flag indicate that the parent span was sampled. You can look into this: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#sampling and this https://www.w3.org/TR/trace-context/#sampled-flag for more informations,

In your repo, if you swap the 0 by a 1 on the end of the B3 header, you’ll get a real span 😃

Read more comments on GitHub >

github_iconTop Results From Across the Web

b3-propagation vs kafkacat - compare differences and reviews ...
To communicate with Kafka, you can use Kafkacat, a command-line tool that allows to produce and consume Kafka messages using a very simple...
Read more >
Spring Cloud Sleuth
Sleuth includes default logic to join a trace across HTTP or messaging boundaries. For example, HTTP propagation works over Zipkin-compatible request headers.
Read more >
B3SingleFormat (Brave 5.6.0 API) - javadoc.io
This format corresponds to the propagation key "b3" (or "B3"), which delimits ... For example, message consumers always do work in child spans,...
Read more >
Sleuth tracing is not working for transactional Kafka producers
Why tracing information do not propagate over kafka messages when Spring Sleuth is in the classpath? 0 · Some Kafka producers that are ......
Read more >
Distributed Tracing for Kafka with OpenTelemetry - Confluent
... travels through your Kafka producer, queue, and consumer. First, we will learn how context propagation works in OpenTelemetry with W3C and B3...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found