Tutorial: Stream an AI response token-by-token
Most AI chat UIs do the same three things: take a prompt, stream the model's response into the DOM as it's generated, and let the user hit Stop mid-response. Doing this well usually means a WebSocket between the browser and your backend, a streaming HTTP client between your backend and the LLM provider, an incremental Markdown renderer that handles unfinished tokens, and a cancellation channel that ties the user's Stop click back to the upstream request.
djust gives you the WebSocket, the streaming-safe Markdown render,
and the cancellation primitive — start_async for the background
network call, {% djust_markdown %} for in-flight rendering, and
cancel_async for Stop. The rest of the tutorial is ~70 lines of
glue.
By the end you'll have a chat page that:
- Takes a prompt in a textarea, submits it, and starts streaming the response immediately — no full-render wait.
- Renders the partial response as safely-rendered Markdown —
half-typed code fences and
<script>injections are escaped. - Shows a Stop button that actually aborts the upstream HTTP request (not just hides the spinner).
- Displays a clear error state if the API call fails, with a Retry that reuses the original prompt.
| You'll learn | Documented in |
|---|---|
AsyncWorkMixin.start_async for off-thread work | Loading States & Background Work |
| Reactive state from a background thread | Loading States |
{% djust_markdown %} for streaming-safe rendering | Streaming Markdown |
cancel_async for user-initiated cancellation | This tutorial |
| Threading + httpx pattern for true upstream cancel | This tutorial |
Prerequisites: Quickstart, the search-as-you-type tutorial (recommended — sets up the loading-state vocabulary), and an API key for any OpenAI-compatible streaming endpoint. The example uses OpenAI's SDK but any provider with an iterator-style streaming response works the same.
What you're building
You: Explain Phoenix LiveView in 3 sentences.
AI: Phoenix LiveView is a server-driven UI library for the
Elixir Phoenix framework. It keeps state on the server and
pushes minimal HTML diffs to the client over a WebSocket so
interactive features can be written without React or any
JavaScript framework.█
[Stop ⏵]
Each character of the AI's reply appears as the model emits it. The
█ cursor blinks at the tail. The "Stop" button stops both the
visual stream AND the upstream API call (so you don't pay for
tokens you'll never display).
Step 1 — The view: state + the prompt handler
# myapp/views.py
import threading
from djust import LiveView, action, state
from djust.mixins.async_work import AsyncWorkMixin
class ChatView(AsyncWorkMixin, LiveView):
template_name = "chat.html"
prompt = state("")
response = state("")
streaming = state(False)
error = state("")
# Held only for cancellation. NOT a reactive state field.
_stop_event: threading.Event | None = None
@action
def submit(self, prompt: str = "", **kwargs):
prompt = prompt.strip()
if not prompt:
raise ValueError("Type a prompt first.")
self.prompt = prompt
self.response = ""
self.error = ""
self.streaming = True
self._stop_event = threading.Event()
self.start_async(self._stream, prompt, name="llm")
@action
def stop(self, **kwargs):
self.cancel_async("llm")
if self._stop_event is not None:
self._stop_event.set()
self.streaming = False
@action
def retry(self, **kwargs):
# Re-fire submit with the prompt that's already in state.
self.submit(prompt=self.prompt)
Three things to call out:
AsyncWorkMixinis a one-line mixin that addsstart_async()/cancel_async()/handle_async_result()to the view. It's the only djust-specific thing you need beyond a normalLiveView._stop_eventis a regularthreading.Event(not astatefield), held onselffor the duration of the streaming call. It's how the background callback knows to abort.@actionwrapssubmitso the template can readsubmit.error(e.g. for the empty-prompt case) without per-handler error wiring.
Step 2 — The streaming callback (background thread)
from openai import OpenAI
client = OpenAI() # picks up OPENAI_API_KEY
class ChatView(AsyncWorkMixin, LiveView):
# ... as above ...
def _stream(self, prompt: str):
"""Runs in a background thread. start_async wires this up."""
try:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
for chunk in stream:
if self._stop_event and self._stop_event.is_set():
stream.close() # closes the upstream HTTP connection
break
delta = chunk.choices[0].delta.content
if delta:
self.response += delta # reactive — triggers a patch
except Exception as exc:
self.error = str(exc)
finally:
self.streaming = False
What happens at runtime:
- The handler returns immediately after
start_async. The browser seesstreaming = Trueandresponse = ""in the first patch. - The background thread starts iterating the OpenAI stream. Each
deltais appended toself.response. Every reassignment is reactive, so every chunk produces a new VDOM diff and a new patch over the WebSocket. - If
_stop_eventis set (from the user clicking Stop), thestream.close()aborts the upstream HTTPS connection cleanly — no more billable tokens are generated. - On any exception,
self.erroris set and the spinner clears.
Step 3 — The template
<!-- myapp/templates/chat.html -->
{% load djust_tags %}
<form dj-submit="submit" class="chat">
<label>
Prompt
<textarea name="prompt" rows="3" required dj-form-pending="disabled">{{ prompt }}</textarea>
</label>
{% if not streaming %}
<button type="submit" dj-form-pending="disabled">
<span dj-form-pending="hide">Send</span>
<span dj-form-pending="show" hidden>Sending…</span>
</button>
{% else %}
<button type="button" dj-click="stop">Stop ⏹</button>
{% endif %}
{% if submit.error %}
<p role="alert" class="err">{{ submit.error }}</p>
{% endif %}
</form>
<article class="response prose">
{% djust_markdown response %}
{% if streaming %}<span class="cursor" aria-hidden="true">█</span>{% endif %}
</article>
{% if error %}
<section class="failure" role="alert">
<p>The model returned an error: <strong>{{ error }}</strong></p>
<button type="button" dj-click="retry">Retry</button>
</section>
{% endif %}
The two non-obvious pieces:
| Piece | Why |
|---|---|
{% djust_markdown response %} | Renders the current value of response as Markdown on every patch. Has built-in handling for partial / mid-stream Markdown (unterminated **bold, half-typed code fences) — see Streaming Markdown for the safety guarantees. No client JS, no DOMPurify pass. |
<span class="cursor">█</span> | A blinking text cursor at the end while the stream is in flight. Pure CSS animation; renders inline because the markdown block is the immediately-preceding sibling. |
Step 4 — Cursor animation
.cursor {
display: inline-block;
margin-left: 2px;
animation: cursor-blink 1s steps(2) infinite;
}
@keyframes cursor-blink {
to { opacity: 0; }
}
Two-step animation gives the snappy "off / on" blink rather than a slow pulse. Cosmetic — drop it if you find it distracting.
What just happened, end to end
Browser Server (WebSocket thread) Background thread OpenAI
│ │ │ │
│ submit("Explain LV…") │ │ │
│ ───────────────────────► │ │ │
│ │ self.streaming = True │ │
│ │ self.response = "" │ │
│ │ self.start_async(_stream) │ │
│ ◄ patch (form disabled, │ │ │
│ spinner shown) ────────│ │ │
│ │ │ │
│ │ │ POST /chat/... │
│ │ │ stream=True │
│ │ │ ───────────────► │
│ │ self.response += "Phoenix " ◄──│ chunk 1: "Phoenix"│
│ ◄ patch (1 chunk) ───────│ │ │
│ │ self.response += "LiveView " ◄─│ chunk 2: ... │
│ ◄ patch (1 chunk) ───────│ │ │
│ │ ... continues ~50 chunks ... │ │
│ │ │ │
│ click Stop │ │ │
│ ───────────────────────► │ cancel_async("llm") │ │
│ │ _stop_event.set() │ │
│ │ │ stream.close() ─►│ TCP RST
│ │ self.streaming = False │ │
│ ◄ patch (cursor gone) ───│ │ │
Three patches per second is typical (the framework batches microsecond-spaced reassignments) — fast enough for the user to read along, slow enough that the WebSocket isn't saturated.
Where to go next
- Multi-turn chat: keep a
messages = state(default_factory=list)history, append each user prompt + assistant response. Pass the full history toclient.chat.completions.create(messages=...)so the model has context. - Tools / function calls: when the model emits a tool call,
pause streaming, run the tool server-side via another
start_async, and resume the conversation. The same_streampattern works recursively. - Throttle the patch rate: for very chatty models you may want
to coalesce 10–50 ms of deltas into one assignment. Buffer in a
local string, then
self.response += bufferperiodically. - Server-side caching: wrap the prompt → response in
functools.lru_cachekeyed on the prompt string for demos / reproducible examples. Disable for real chat — caching trims variability the model intends. - Per-user cost control: check
self.request.user.tokens_usedbefore callingstart_asyncand refuse over-quota requests with a typed error inself.error.
The five-primitive recipe (AsyncWorkMixin, state,
start_async, cancel_async, {% djust_markdown %}) is the same
shape every "long-running server-pushed UI" feature uses —
streaming AI completions, live transcription, slow imports with
progress, search-result re-ranking, etc. Once it clicks, dragging
in another LLM provider or replacing the model with a local
embedding pass is a few lines of _stream body.