Wiki-Quellcode von NATS Setup

Version 11.1 von Daniel Herrmann am 2025/09/20 10:00

Zeige letzte Bearbeiter
1 NATS ist eine zentrale Komponente der Architektur. NATS ist ein verteiltes Messaging-System, welches häufig für Microservices verwendet wird.
2
3 Unser Backend (oder auch andere Systeme) publizieren Events in NATS, die dann beispielsweise in N8n über ein Trigger konsumiert werden.
4
5 = Grundlagen =
6
7 == Übersicht ==
8
9 NATS kann grob in zwei Varianten unterteilt werden, **NATS Core** und **NATS JetStream**.
10
11 NATS Core ist das Grundsystem von NATS und bietet schnelle, leichtgewichtige Kommunikationsmöglichkeiten wie Publish/Subscribe, Request/Reply und Queueing. Dabei werden Nachrichten in der Regel nur flüchtig übertragen: Geht ein Empfänger offline, gehen die Nachrichten verloren. Der Fokus liegt auf extrem niedriger Latenz und hoher Performance.
12
13 NATS JetStream erweitert NATS Core um Funktionen wie **Persistenz, wiederholtes Lesen von Nachrichten und zeitversetzte Verarbeitung**. Nachrichten können gespeichert werden, was eine zuverlässige Zustellung, Wiederholungen und komplexe Workflows ermöglicht. JetStream ist damit besonders geeignet für Szenarien, in denen eine dauerhafte Verarbeitung oder Event-Sourcing erforderlich ist, während Core eher für schnelle, vorübergehende Nachrichtenübertragung optimiert ist.
14
15 == Accounts ==
16
17 (% style="color:var(--ds-text,#333333); text-decoration:none" %)Accounts sind eine Möglichkeit, Mandanten oder Anwendungen innerhalb eines Clusters oder NATS Servers voneinander abzugrenzen. Jeder Account besitzt einen eigenen Namespace für Subjekte, verwaltet seine Benutzer und deren Berechtigungen und bietet so eine klare Trennung zwischen unterschiedlichen Workloads. Gleichzeitig lassen sich über Exports und Imports bestimmte Subjekte gezielt mit anderen Accounts teilen, sodass Kommunikation zwischen getrennten Bereichen möglich bleibt. Auf diese Weise stellen Accounts die Grundlage für Multi-Tenancy, Sicherheit und flexible Skalierung in NATS dar.
18
19 (% style="color:var(--ds-text,#333333); text-decoration:none" %)Wir verwenden zwei verschiedene Accounts:
20
21 |=(((
22 Account
23 )))|=(((
24 Berechtigungen
25 )))|=(((
26 Beschreibung
27 )))
28 |(((
29 SYS
30 )))|(((
31 SYSTEM Account
32 )))|(((
33 Wird für administrative Tätigkeiten wie beispielsweise Troubleshooting oder das Auslesen von Informationen verwendet
34 )))
35 |(((
36 MKSP
37 )))|(((
38 Regulärer Account
39 )))|(((
40 Regulärer Account für Daten
41 )))
42
43 Benutzer können jeweils nur einem Account zugeordnet werden.
44
45 == Authentifizierung ==
46
47 Die Kommunikation ist TLS verschlüsselt, es kommen die TLS Zertifikate aus [[HashiCorp KeyVault>>doc:xwiki:IN.IT Infrastruktur.Kubernetes Cluster.Hashicorp Vault.WebHome]] zum Einsatz. Diese werden dann einzelnen Accounts zugewiesen. Der NATS Server validiert das Zertifikat an Hand der Zertifikatskette und mapt dann die Zertifikatsinformationen zu einem User. Üblicherweise wird dabei ein SAN (Subject Alternative Name) in Form einer Mail-Adresse verwendet. Diese gibt es in unserem Fall nicht, sodass das Subject verwendet wird. Wichtig ist, dass das Subject in RFC2253 Form verwendet werden muss.
48
49 {{code language="shell"}}
50 $ openssl x509 -noout -text -nameopt RFC2253 -in backend.mksp-da.de.crt
51 Certificate:
52 Data:
53 ...
54 Subject: CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE
55 {{/code}}
56
57 Diese Subject wird dann dem Account zugeordnet:
58
59 {{code}}
60 system_account: SYS
61 accounts: {
62 MKSP: {
63 jetstream: enabled
64 users: [
65 {
66 user: "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
67 },
68 {
69 user: "CN=n8n.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
70 },
71 {
72 user: "CN=js-admin.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
73 },
74 ]
75 }
76 SYS: {
77 users: [
78 {user: "CN=admin.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"}
79 ]
80 }
81 }
82 {{/code}}
83
84 Da ein User nur einem Account zugeordnet werden kann, gibt es einen admin User für administrative Tätigkeiten sowie einen JetStream Admin User für die Account-Verwaltung.
85
86 == NATS Core mit Accounts ==
87
88 Damit die NATS Core Messages auch weiterhin funktionieren müssen publish und subscribe Berechtigungen vergeben werden. NATS wird neben dem Eventmanagement auch noch für die Task Queue mit Taskiq eingesetzt, hier werden zur Zeit noch Core Funktionalitäten verwendet. Daher muss die Berechtigung für den backend User erweitert werden:
89
90 {{code}}
91 accounts: {
92 MKSP: {
93 users: [
94 {
95 user: "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
96 permissions: {
97 publish: [">"]
98 subscribe: [">"]
99 }
100 },
101 ...
102 {{/code}}
103
104 Wichtig sind hier die zusätzlichen Permissions, die den Zugriff auf Core Events regeln.
105
106 == JetStream Konfiguration ==
107
108 In unserem Fall ist Persistenz auch bei einem zeitweisen Ausfall der Internetverbindung im Makerspace notwendig, sodass wir JetStream mit Message Retention verwenden. Dabei müssen die wichtigen Komponenten vorab angelegt werden.
109
110 === NATS CLI konfigurieren ===
111
112 Zur Administration bietet sich die Verwendung der [[NATS CLI >>url:https://github.com/nats-io/natscli||shape="rect"]]an. Diese muss einmalig eingerichtet werden, dafür sind die entsprechenden Zertifikatsdateien notwendig. Wir legen zwei Kontexte an, einen für den Cluster Admin, einen für den JetStream Admin:
113
114 {{code language="shell"}}
115 nats context add mksp-sysadmin --server ds-hetzner.mksp-da.de:4222 --description "MKSP NATS SYS Admin" --tlscert /path/to/admin.mksp-da.de.crt --tlskey /path/to/admin.mksp-da.de.key --tlsca /path/to/mksp_root_x1_ca.crt
116
117 nats context add mksp-jsadmin --server ds-hetzner.mksp-da.de:4222 --description "MKSP NATS JetStream" --tlscert /path/to/jsadmin.mksp-da.de.crt --tlskey /path/to/jsadmin.mksp-da.de.key --tlsca /path/to/mksp_root_x1_ca.crt
118 {{/code}}
119
120 Anschließend wählen wir den richtigen Kontext aus, in diesem Fall den JetStream Admin Context:
121
122 {{code language="shell"}}
123 nats context select mksp-jsadmin
124 {{/code}}
125
126 === JetStream initialisieren ===
127
128 JetStream ist in so genannte **Streams** unterteilt, welche bestimmte **Subjects** in einem Stream zusammenfassen. Wir nutzen für alle Events einen gemeinsamen Präfix (mksp), sodass wir den Stream hierfür erstellen. Die NATS Dokumentation selbst beschreibt die Konzepte sehr gut:
129
130 * JetStream im Allgemeinen: [[https:~~/~~/docs.nats.io/nats-concepts/jetstream>>url:https://docs.nats.io/nats-concepts/jetstream||shape="rect"]]
131 * JetStream Streams: [[https:~~/~~/docs.nats.io/nats-concepts/jetstream/streams>>url:https://docs.nats.io/nats-concepts/jetstream/streams||shape="rect"]]
132
133 Wir legen nun also ein Stream namens **backend_events** an:
134
135 {{code language="shell"}}
136 nats stream add backend_events --subjects "mksp.>" --retention work --max-age 7d --storage file --defaults
137 {{/code}}
138
139 === JetStream Consumer ===
140
141 Consumer sind eine Auswahl / Ansicht von Teilen eines Streams. Details finden sich wieder in der NATS Dokumentation: [[https:~~/~~/docs.nats.io/nats-concepts/jetstream/consumers>>url:https://docs.nats.io/nats-concepts/jetstream/consumers||shape="rect"]]
142
143 Consumer können entweder persistent sein (dauerhaft, ein Client kann dann den bestehenden Consumer neben) oder flüchtig (ephemeral), in diesem Fall wird der Consumer beim Erstellen des Clients (beispielsweise durch N8n) angelegt. In der Regel werden Consumer durch einen Filter auf bestimmte Subjects beschränkt, diese dürfen sich nicht überschneiden. Da die NATS JavaScript Client Library (die von n8n verwendet wird) es leider nicht sinnvoll erlaubt, ephemeral Consumer anzulegen verwenden wir dauerhafte Consumer-Namen.
144
145 |=(((
146 Backend Event
147 )))|=(((
148 Consumer Name
149 )))|=(((
150 Schema
151 )))
152 |(((
153 {{{mksp.backend.briefing.offer.cancelled}}}
154
155 (((
156
157 )))
158 )))|(((
159 backend_briefing_offer_cancelled
160 )))|(((
161 (% class="nc" %)##BriefingOfferEvent##
162 )))
163 |(((
164 {{{mksp.backend.briefing.offer.created}}}
165
166 (((
167
168 )))
169 )))|(((
170 backend_briefing_offer_created
171 )))|(((
172 (% class="nc" %)##BriefingOfferEvent##
173 )))
174 |(((
175 {{{mksp.backend.briefing.offer.updated}}}
176 )))|(((
177 backend_briefing_offer_updated
178 )))|(((
179 (% class="nc" %)##BriefingOfferEvent##
180 )))
181 |(((
182 {{{mksp.backend.briefing.created}}}
183
184 (((
185
186 )))
187 )))|(((
188 backend_briefing_created
189 )))|(((
190 (% class="n" %)##BriefingEvent##
191 )))
192 |(((
193 {{{mksp.backend.key.assigned}}}
194 )))|(((
195 backend_key_assigned
196 )))|(((
197 (% class="nc" %)##KeyEvent##
198 )))
199 |(((
200 {{{mksp.backend.key.unassigned}}}
201 )))|(((
202 backend_key_unassigned
203 )))|(((
204 (% class="nc" %)##KeyEvent##
205 )))
206 |(((
207 {{{mksp.backend.storage.reserved}}}
208 )))|(((
209 backend_storage_reserved
210 )))|(((
211 (% class="nc" %)##StorageSpaceEvent##
212 )))
213 |(((
214 {{{
215 }}}
216 )))|(((
217 backend_storage_released
218 )))|(((
219 (% class="nc" %)##StorageSpaceEvent##
220 )))
221 |(((
222 {{{
223 }}}
224 )))|(((
225 backend_storage_expired
226 )))|(((
227 (% class="nc" %){{code language="none"}}StorageSpaceEvent{{/code}}
228 )))
229 |(((
230 {{{
231 }}}
232 )))|(((
233 backend_user_converted_to_member
234 )))|(((
235 (% class="nc" %){{code language="none"}}UserEvent{{/code}}
236 )))
237 |(((
238 {{{
239 }}}
240 )))|(((
241 backend_user_converted_to_guest
242 )))|(((
243 (% class="nc" %){{code language="none"}}UserEvent{{/code}}
244 )))
245
246 Die Consumer können dann wie folgt angelegt werden:
247
248 {{code}}
249 $ nats consumer add backend_events <name> --pull --filter="<filter>" --defaults
250
251 # Beispiel
252 nats consumer add backend_events backend_briefing_offer_cancelled --pull --filter="mksp.backend.briefing.offer.cancelled" --defaults
253 nats consumer add backend_events backend_key_assigned --pull --filter="mksp.backend.key.assigned" --defaults
254 {{/code}}
255
256 = {{id name="NATSSetup-Troubleshooting"/}}Troubleshooting =
257
258 Man kann über die NATS CLI mit dem Server interagieren.
259
260 == {{id name="NATSSetup-JetStream"/}}JetStream ==
261
262 === {{id name="NATSSetup-Streams"/}}Streams ===
263
264 Man kann sich die vorhandenen Streams und die Anzahl der Nachrichten anschauen. Die Anzahl sollte üblicherweise 0 sein, wenn die Zahl höher ist, werden Events nicht richtig abgearbeitet.
265
266 {{code language="shell"}}
267 $ nats context select mksp-jsadmin
268 $ nats stream ls
269 ╭─────────────────────────────────────────────────────────────────────────────────────╮
270 │ Streams │
271 ├────────────────┬─────────────┬─────────────────────┬──────────┬──────┬──────────────┤
272 │ Name │ Description │ Created │ Messages │ Size │ Last Message │
273 ├────────────────┼─────────────┼─────────────────────┼──────────┼──────┼──────────────┤
274 │ backend_events │ │ 2025-09-08 08:36:19 │ 0 │ 0 B │ never │
275 ╰────────────────┴─────────────┴─────────────────────┴──────────┴──────┴──────────────╯
276 {{/code}}
277
278 Ebenso kann man sich - falls vorhandenen - die Nachrichten im Stream anzeigen lassen:
279
280 {{code language="shell"}}
281 $ nats context select mksp-jsadmin
282 $ nats stream view backend_events
283 [1] Subject: mksp.backend.storage.reserved Received: 2025-09-08 09:06:00
284
285 {"timestamp":"2025-09-08T09:06:00.062358","stora
286 {{/code}}
287
288 === {{id name="NATSSetup-Consumer"/}}Consumer ===
289
290 Die bestehenden Consumer kann man sich ebenfalls im Jetstream Context anzeigen lassen:
291
292 {{code}}
293 $ nats consumer ls backend_events                       
294 ╭──────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
295 │ Consumers │
296 ├──────────────────────────────────┬─────────────┬─────────────────────┬─────────────┬─────────────┬───────────────┤
297 │ Name │ Description │ Created │ Ack Pending │ Unprocessed │ Last Delivery │
298 ├──────────────────────────────────┼─────────────┼─────────────────────┼─────────────┼─────────────┼───────────────┤
299 │ backend_briefing_created │ │ 2025-09-10 08:30:12 │ 0 │ 12 │ never │
300 │ backend_briefing_offer_cancelled │ │ 2025-09-10 08:21:02 │ 0 │ 0 │ never │
301 │ backend_briefing_offer_created │ │ 2025-09-10 08:28:25 │ 0 │ 0 │ never │
302 │ backend_briefing_offer_updated │ │ 2025-09-10 08:29:19 │ 0 │ 0 │ never │
303 │ backend_key_assigned │ │ 2025-09-10 08:21:04 │ 0 │ 0 │ never │
304 ╰──────────────────────────────────┴─────────────┴─────────────────────┴─────────────┴─────────────┴───────────────╯
305 {{/code}}
306
307 In dieser Tabelle sieht man bereits die "unprocessed" Nachrichten. Ist diese Zahl größer als 0 s sieht also gut, wenn n8n die Nachrichten nicht korrekt verarbeitet. Man kann sich ebenfalls die Details eines Consumers anzeigen lassen:
308
309 {{code}}
310 $ nats consumer info backend_events backend_briefing_offer_cancelled
311
312 Information for Consumer backend_events > backend_briefing_offer_cancelled created 2025-09-09 22:53:26
313
314 Configuration:
315
316 Name: backend_briefing_offer_cancelled
317 Pull Mode: true
318 Filter Subject: mksp.backend.briefing.offer.cancelled
319 Deliver Policy: All
320 Ack Policy: Explicit
321 Ack Wait: 30.00s
322 Replay Policy: Instant
323 Max Ack Pending: 1,000
324 Max Waiting Pulls: 512
325
326 State:
327
328 Host Version: 2.11.8
329 Required API Level: 0 hosted at level 1
330 Last Delivered Message: Consumer sequence: 0 Stream sequence: 10
331 Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
332 Outstanding Acks: 0 out of maximum 1,000
333 Redelivered Messages: 0
334 Unprocessed Messages: 0
335 Waiting Pulls: 1 of maximum 512
336 {{/code}}
337
338 Insbesondere die letzten beiden Zeilen sind wichtig:
339
340 * **Unprocessed Messages** sind Nachrichten, die noch nicht abgerufen wurden. Dies sollte immer 0 sein
341 * **Waiting Pulls**: die Anzahl der verbundenen Clients. Üblicherweise sollte das nur n8n sein, also immer 1.
342
343 == {{id name="NATSSetup-Server"/}}Server ==
344
345 Man kann sich ebenfalls die aktiven Verbindungen anzeigen:
346
347 {{code language="shell"}}
348 $ nats context select mksp-sysadmin
349 $ nats server report connections   
350
351 ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
352 │ Top 6 Connections out of 6 by subs │
353 ├─────┬──────────────────────────┬──────────────┬─────────┬───────────────────┬─────────┬──────────┬─────────┬──────────┬──────────┬───────────┬──────┤
354 │ CID │ Name │ Server │ Cluster │ IP │ Account │ Uptime │ In Msgs │ Out Msgs │ In Bytes │ Out Bytes │ Subs │
355 ├─────┼──────────────────────────┼──────────────┼─────────┼───────────────────┼─────────┼──────────┼─────────┼──────────┼──────────┼───────────┼──────┤
356 │ 83 │ backend_kicker │ mksp-nats-01 │ │ 172.18.0.33:52808 │ MKSP │ 2h24m18s │ 0 │ 0 │ 0 B │ 0 B │ 0 │
357 │ 93 │ backend_scheduler │ mksp-nats-01 │ │ 172.18.0.29:47620 │ MKSP │ 2h23m23s │ 0 │ 0 │ 0 B │ 0 B │ 0 │
358 │ 82 │ backend_fastapi │ mksp-nats-01 │ │ 172.18.0.33:52798 │ MKSP │ 2h24m18s │ 1 │ 1 │ 345 B │ 838 B │ 1 │
359 │ 94 │ backend_worker │ mksp-nats-01 │ │ 172.18.0.37:53550 │ MKSP │ 2h23m23s │ 0 │ 0 │ 0 B │ 0 B │ 1 │
360 │ 95 │ backend_worker │ mksp-nats-01 │ │ 172.18.0.37:53554 │ MKSP │ 2h23m23s │ 0 │ 0 │ 0 B │ 0 B │ 1 │
361 │ 111 │ NATS CLI Version 0.2.4 │ mksp-nats-01 │ │ 10.1.253.1:50712 │ SYS │ 0s │ 2 │ 1 │ 210 B │ 944 B │ 1 │
362 ├─────┼──────────────────────────┼──────────────┼─────────┼───────────────────┼─────────┼──────────┼─────────┼──────────┼──────────┼───────────┼──────┤
363 │ │ Totals for 6 connections │ │ │ │ │ │ 3 │ 2 │ 555 B │ 1.7 KiB │ 4 │
364 ╰─────┴──────────────────────────┴──────────────┴─────────┴───────────────────┴─────────┴──────────┴─────────┴──────────┴──────────┴───────────┴──────╯
365
366 ╭──────────────────────────────────────╮
367 │ Connections per server │
368 ├──────────────┬─────────┬─────────────┤
369 │ Server │ Cluster │ Connections │
370 ├──────────────┼─────────┼─────────────┤
371 │ mksp-nats-01 │ │ 6 │
372 ╰──────────────┴─────────┴─────────────╯
373 {{/code}}
374
375 Der Name ist hierbei frei vergeben, in unserem Fall im Backend Code. Wir erwarten die folgenden Clients:
376
377 |=(((
378 Client
379 )))|=(((
380 Anzahl
381 )))|=(((
382 Komponente
383 )))|=(((
384 Art
385 )))|=(((
386 Beschreibung
387 )))
388 |(((
389 backend_kicker
390 )))|(((
391 1
392 )))|(((
393 Taskiq
394 )))|(((
395 Core
396 )))|(((
397 Gibt dem Backend die Möglichkeit, asynchrone Tasks an das Taskiq backend zu delegieren
398 )))
399 |(((
400 backend_scheduler
401 )))|(((
402 1
403 )))|(((
404 Taskiq
405 )))|(((
406 Core
407 )))|(((
408 Komponente welches die regelmäßigen Jobs überwacht und triggert
409 )))
410 |(((
411 backend_fastapi
412 )))|(((
413 1
414 )))|(((
415 FastAPI
416 )))|(((
417 JetStream
418 )))|(((
419 Das Backend kann hiermit Events ausgeben, die dann von N8n abgearbeitet werden.
420 )))
421 |(((
422 backend_worker
423 )))|(((
424 2
425 )))|(((
426 Taskiq
427 )))|(((
428 Core
429 )))|(((
430 Worker für asynchrone Tasks
431 )))
432
433 Man kann sich ebenfalls die Subscriptions der jeweiligen Connections anschauen. Hierfür muss die Ausgabe auf JSON umgestellt werden:
434
435 {{code language="shell"}}
436 $ nats context select mksp-jsadmin
437 $ nats server report connections -j
438 [
439 {
440 "cid": 95,
441 "kind": "Client",
442 "type": "nats",
443 "ip": "172.18.0.37",
444 "port": 53554,
445 "start": "2025-09-08T13:35:37.398441269Z",
446 "last_activity": "2025-09-08T13:35:37.434868631Z",
447 "rtt": "479µs",
448 "uptime": "2h27m5s",
449 "idle": "2h27m5s",
450 "pending_bytes": 0,
451 "in_msgs": 0,
452 "out_msgs": 0,
453 "in_bytes": 0,
454 "out_bytes": 0,
455 "subscriptions": 1,
456 "name": "backend_worker",
457 "lang": "python3",
458 "version": "2.11.0",
459 "tls_version": "1.3",
460 "tls_cipher_suite": "TLS_AES_128_GCM_SHA256",
461 "tls_peer_certs": [
462 {
463 "subject": "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE",
464 "spki_sha256": "1c7826ac936267622a5bfec0359304e795a4fd552b4d76556d7fcbddf51f43fb",
465 "cert_sha256": "e25dd3084b13f5f3cb81ebb9291627bcd0a25ab51325f1cc627a2b758f03d1ca"
466 },
467 {
468 "subject": "CN=MKSP NATS Issuing CA,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE",
469 "spki_sha256": "af16c3490475a4ba65d273b7dae825622b4c1eeca5f3da808df14b3c298627a7",
470 "cert_sha256": "10ccebe5826128f50f7b007537fa8b832326d4bb14b1676588abac9a10bf7585"
471 }
472 ],
473 "authorized_user": "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE",
474 "account": "MKSP",
475 "subscriptions_list": [
476 "taskiq_tasks"
477 ],
478 "name_tag": "MKSP",
479 "server": {
480 "name": "mksp-nats-01",
481 "host": "0.0.0.0",
482 "id": "NDH7ZAPFJ5EORJKZDJGOUNN3NDMTLWR6V7BGBOYUIG5UE7VEIK76F6WH",
483 "ver": "2.11.6",
484 "jetstream": true,
485 "flags": 7,
486 "seq": 2937,
487 "time": "2025-09-08T16:02:42.888686229Z"
488 }
489 },
490 ]
491 {{/code}}
492
493