III.3: SSE streaming
III.2 gave us a working /v1/chat/completions endpoint, but the response arrives all at once: the client POSTs, waits the entire turn (which can be many seconds for a long reply), and then receives the whole thing in one JSON blob. Every chat UI you've used does better than that: words appear as they're typed.
That difference is streaming. The chat pipeline from III.1 already produces text incrementally: run_chat_turn_streaming_with_prefix fires an on_delta callback once per chunk of new text. In III.2 we threw those deltas away with |_| {}. This chapter wires that callback to the network: when a request asks for stream: true, the server pushes each delta to the client the instant it's generated.
The mechanism is Server-Sent Events, and that's what OpenAI's streaming API uses, so once again we're implementing an existing standard rather than inventing one.
What Server-Sent Events are
A normal HTTP response is one body, sent once, then the connection closes. Server-Sent Events (SSE) is a small twist on that: the response stays open, and the server writes a sequence of small text messages down it over time. The client reads them as a stream. It's a one-way channel (server to client), which is exactly what token streaming needs.
The wire format is plain text and almost embarrassingly simple. Each event is one or more lines, and a blank line ends the event:
data: {"first":"chunk"}
data: {"second":"chunk"}
data: [DONE]
Every line starts with data: , the payload follows, and the blank line is the delimiter. That's the entire protocol. The browser's EventSource API and every OpenAI client know how to parse it; we just have to produce it.
OpenAI's streaming /v1/chat/completions puts a specific thing in each data: line. Instead of one big chat.completion object, the server sends many small chat.completion.chunk objects. The first chunk announces the assistant role; each middle chunk carries a delta, a fragment of new text; the last chunk carries the finish_reason. Then a final literal data: [DONE] line signals the end. Reassembled, the deltas spell out the full reply.
So our job: when stream: true, hold the connection open, run the turn, and for every text delta emit one chat.completion.chunk event.
The crate
Two small dependencies:
futures-util = "0.3"
tokio-stream = "0.1"axum's SSE support wants the response body as a Stream, an async sequence of items. futures-util and tokio-stream provide the adapters that turn a tokio channel receiver into exactly that kind of Stream.
The chunk types
src/openapi/types.rs gains three structs for the streaming response: the chunk and its nested pieces:
#[derive(Debug, Serialize)]
pub struct ChatCompletionChunk {
pub id: String,
pub object: &'static str,
pub created: u64,
pub model: String,
pub choices: Vec<ChunkChoice>,
}
#[derive(Debug, Serialize)]
pub struct ChunkChoice {
pub index: u32,
pub delta: ChunkDelta,
#[serde(skip_serializing_if = "Option::is_none")]
pub finish_reason: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct ChunkDelta {
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
}ChatCompletionChunk mirrors the non-streaming ChatCompletionResponse from III.2, but its choice carries a delta instead of a full message, and there's no usage. The object field is always "chat.completion.chunk".
The skip_serializing_if = "Option::is_none" attributes are what make one struct serve three roles. A chunk is one of: the role announcement, a content delta, or the finish marker; in each case the other fields should be absent from the JSON entirely, not present as null. So the role-announcement chunk serializes to {"delta":{"role":"assistant"}}, a content chunk to {"delta":{"content":"4."}}, the final chunk to {"delta":{},"finish_reason":"stop"}. Each is a valid OpenAI streaming chunk.
Building one chunk
src/openapi/router.rs gets a helper that builds a chunk and serializes it to a JSON string, the string that goes after data: on the wire:
fn build_chunk(
id: &str,
created: u64,
model: &str,
role: Option<&str>,
content: Option<String>,
finish_reason: Option<&str>,
) -> String {
serde_json::to_string(&ChatCompletionChunk {
id: id.to_owned(),
object: "chat.completion.chunk",
created,
model: model.to_owned(),
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta {
role: role.map(Into::into),
content,
},
finish_reason: finish_reason.map(Into::into),
}],
})
.unwrap()
}id and created are fixed for the whole stream; every chunk of one response carries the same id, the way OpenAI does it. role, content, and finish_reason are each Option: pass Some for the one that applies to this chunk, None for the rest, and the skip_serializing_if attributes drop the absent ones. So the three call shapes are:
build_chunk(id, created, model, Some("assistant"), None, None): the opening chunk.build_chunk(id, created, model, None, Some(delta), None): a content chunk.build_chunk(id, created, model, None, None, Some("stop")): the final chunk.
Branching on stream
The chat_completions handler from III.2 gets one new check at the top. Right after it resolves the messages, token cap, and model name:
if req.stream {
return stream_response(Arc::clone(&state), messages, max_tokens, model_name);
}If the request didn't ask for streaming, the rest of the handler (the III.2 non-streaming path) runs unchanged. If it did, control hands off to stream_response, which returns an SSE response instead of a JSON one.
Producing the stream
stream_response is the heart of the chapter. It has a structural problem to solve: the model runs on a blocking thread (a long CPU-bound forward pass, per III.2's spawn_blocking discussion), but the SSE body is consumed by the async runtime as axum flushes bytes to the socket. The blocking model code and the async response writer can't share a stack; they need a pipe between them.
A tokio channel is that pipe. The blocking task is the producer, writing chunk strings in; the async side is the consumer, reading them out and flushing them to the client.
fn stream_response(
state: Arc<ChatServerState>,
messages: Vec<ChatTemplateMessage>,
max_tokens: usize,
model_name: String,
) -> Response {
let (sse_tx, sse_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let id = new_completion_id();
let created = unix_secs();
let kv_mode = Some(state.kv_cache_mode);unbounded_channel::<String>() creates the pipe: sse_tx is the sending half, sse_rx the receiving half, and each message is one ready-to-send chunk string. id and created are computed once and captured by every chunk in this stream.
Next, the producer: a blocking task that runs the whole turn and feeds the channel:
tokio::task::spawn_blocking(move || {
sse_tx
.send(build_chunk(
&id,
created,
&model_name,
Some("assistant"),
None,
None,
))
.unwrap();
let mut metrics = Metrics::default();
let result = run_chat_turn_streaming_with_prefix(
state.model.clone(),
state.tokenizer.clone(),
&state.backend,
&messages,
max_tokens,
kv_mode,
&mut metrics,
|delta| {
let chunk =
build_chunk(&id, created, &model_name, None, Some(delta.to_string()), None);
let _ = sse_tx.send(chunk);
},
);The task first sends the opening chunk: Some("assistant") for the role, nothing else. Then it calls run_chat_turn_streaming_with_prefix from III.1. This is where the on_delta callback finally earns its keep: instead of the III.2 |_| {}, the closure builds a content chunk from each delta and sends it down the channel. Every time the model produces a new piece of text, a chunk lands in the pipe.
send can fail if the client disconnected and the receiver was dropped, hence let _ =. We don't treat a hung-up client as an error; the turn just runs to completion and its chunks go nowhere.
When the turn finishes, the task sends a closing chunk and the [DONE] marker:
match result {
Ok(out) => {
let reason = if out.hit_stop { "stop" } else { "length" };
let _ = sse_tx.send(build_chunk(
&id,
created,
&model_name,
None,
None,
Some(reason),
));
}
Err(e) => {
let _ = sse_tx.send(
serde_json::json!({
"error": { "message": e, "type": "invalid_request_error" }
})
.to_string(),
);
}
}
let _ = sse_tx.send("[DONE]".into());
});On success, a final chunk with finish_reason and no content: "stop" or "length", the same hit_stop mapping as III.2. On error, an OpenAI-shaped error object instead. Either way the last thing sent is the literal string [DONE], the marker that tells the client the stream is over.
Back on the async side, turn the channel's receiving half into the SSE response body:
let stream = UnboundedReceiverStream::new(sse_rx)
.map(|data| Ok::<_, Infallible>(Event::default().data(data)));
Sse::new(stream)
.keep_alive(KeepAlive::default())
.into_response()
}UnboundedReceiverStream::new(sse_rx) wraps the receiver as an async Stream of Strings. .map(...) converts each string into an SSE Event; Event::default().data(data) is axum's builder for one data: <payload> line. The Ok::<_, Infallible> wrapping is axum's required item type: a Result whose error type is Infallible, the empty type, since this stream never fails.
Sse::new(stream) makes the streaming response. keep_alive(KeepAlive::default()) makes axum send periodic SSE comment lines during quiet stretches so an idle proxy doesn't kill the connection, which matters when prefill is slow and no token has been produced yet. .into_response() finishes it.
Here's the full picture for one streaming request:
client axum (async) spawn_blocking (model)
│ POST stream:true │ │
├─────────────────────────>│ │
│ │── start ────────────────>│
│ │ │ send "assistant" chunk
│ data: {role} │<──── channel ────────────┤
│<─────────────────────────┤ │
│ │ │ on_delta → content chunk
│ data: {content "2"} │<──── channel ────────────┤
│<─────────────────────────┤ │ on_delta → content chunk
│ data: {content " + 2"} │<──── channel ────────────┤
│<─────────────────────────┤ │ ... (one per delta)
│ data: {finish "stop"} │<──── channel ────────────┤ send finish chunk
│<─────────────────────────┤ │
│ data: [DONE] │<──── channel ────────────┤ send [DONE]
│<─────────────────────────┤ │The blocking thread decodes tokens at its own pace; each chunk it produces is flushed to the socket as soon as axum's async side reads it from the channel.
Running it
Restart chat-server as in III.2, then send a request with stream: true and curl -N (the -N disables buffering so you see chunks arrive live):
curl -N -s http://127.0.0.1:8000/v1/chat/completions \
-H 'content-type: application/json' \
-d '{"model":"qwen3-0.6b","stream":true,"messages":[{"role":"user","content":"What is 2 + 2?"}]}'The events arrive one at a time, each appearing the moment its token is generated:
data: {"id":"chatcmpl-17e3...","object":"chat.completion.chunk","created":1747699200,"model":"qwen3-0.6b","choices":[{"index":0,"delta":{"role":"assistant"}}]}
data: {"id":"chatcmpl-17e3...","object":"chat.completion.chunk","created":1747699200,"model":"qwen3-0.6b","choices":[{"index":0,"delta":{"content":"2"}}]}
data: {"id":"chatcmpl-17e3...","object":"chat.completion.chunk","created":1747699200,"model":"qwen3-0.6b","choices":[{"index":0,"delta":{"content":" + 2 = 4."}}]}
data: {"id":"chatcmpl-17e3...","object":"chat.completion.chunk","created":1747699200,"model":"qwen3-0.6b","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]First the role chunk, then a content chunk per delta, then the finish chunk, then [DONE]. Concatenating the content fields gives "2 + 2 = 4.", the same answer III.2's non-streaming endpoint produced, just delivered piece by piece. The OpenAI Python SDK with stream=True consumes this exact format; so does any chat UI that speaks the OpenAI API.
Where this leaves us
The server now does both halves of the OpenAI chat API: a single JSON response, or a live SSE stream of chat.completion.chunk deltas ending in [DONE]. A real chat client can connect and watch the reply appear word by word.
It still serves one request at a time, though. spawn_blocking runs the model on a separate thread, but there's still one model and the turns run back to back; a second request waits for the first to finish. Worse, the KV cache from II.2 wasn't built for a server: it reallocates and copies the entire history on every decode step, and it has no story for many caches living at once. Before we can serve concurrent users, that cache needs replacing. The next chapter builds a paged KV cache.