Wiki-Quellcode von NATS - Technisches Setup

Version 14.1 von Daniel Herrmann am 2025/10/02 18:48

Zeige letzte Bearbeiter
1 NATS ist eine zentrale Komponente der Architektur. NATS ist ein verteiltes Messaging-System, welches häufig für Microservices verwendet wird.
2
3 Unser Backend (oder auch andere Systeme) publizieren Events in NATS, die dann beispielsweise in N8n über ein Trigger konsumiert werden.
4
5 = Grundlagen =
6
7 == Übersicht ==
8
9 NATS kann grob in zwei Varianten unterteilt werden, **NATS Core** und **NATS JetStream**.
10
11 NATS Core ist das Grundsystem von NATS und bietet schnelle, leichtgewichtige Kommunikationsmöglichkeiten wie Publish/Subscribe, Request/Reply und Queueing. Dabei werden Nachrichten in der Regel nur flüchtig übertragen: Geht ein Empfänger offline, gehen die Nachrichten verloren. Der Fokus liegt auf extrem niedriger Latenz und hoher Performance.
12
13 NATS JetStream erweitert NATS Core um Funktionen wie **Persistenz, wiederholtes Lesen von Nachrichten und zeitversetzte Verarbeitung**. Nachrichten können gespeichert werden, was eine zuverlässige Zustellung, Wiederholungen und komplexe Workflows ermöglicht. JetStream ist damit besonders geeignet für Szenarien, in denen eine dauerhafte Verarbeitung oder Event-Sourcing erforderlich ist, während Core eher für schnelle, vorübergehende Nachrichtenübertragung optimiert ist.
14
15 == Accounts ==
16
17 (% style="color:var(--ds-text,#333333); text-decoration:none" %)Accounts sind eine Möglichkeit, Mandanten oder Anwendungen innerhalb eines Clusters oder NATS Servers voneinander abzugrenzen. Jeder Account besitzt einen eigenen Namespace für Subjekte, verwaltet seine Benutzer und deren Berechtigungen und bietet so eine klare Trennung zwischen unterschiedlichen Workloads. Gleichzeitig lassen sich über Exports und Imports bestimmte Subjekte gezielt mit anderen Accounts teilen, sodass Kommunikation zwischen getrennten Bereichen möglich bleibt. Auf diese Weise stellen Accounts die Grundlage für Multi-Tenancy, Sicherheit und flexible Skalierung in NATS dar.
18
19 (% style="color:var(--ds-text,#333333); text-decoration:none" %)Wir verwenden zwei verschiedene Accounts:
20
21 |=(((
22 Account
23 )))|=(((
24 Berechtigungen
25 )))|=(((
26 Beschreibung
27 )))
28 |(((
29 SYS
30 )))|(((
31 SYSTEM Account
32 )))|(((
33 Wird für administrative Tätigkeiten wie beispielsweise Troubleshooting oder das Auslesen von Informationen verwendet
34 )))
35 |(((
36 MKSP
37 )))|(((
38 Regulärer Account
39 )))|(((
40 Regulärer Account für Daten
41 )))
42
43 Benutzer können jeweils nur einem Account zugeordnet werden.
44
45 == Authentifizierung ==
46
47 Die Kommunikation ist TLS verschlüsselt, es kommen die TLS Zertifikate aus [[HashiCorp KeyVault>>doc:xwiki:IN.IT Infrastruktur.Kubernetes Cluster.Hashicorp Vault.WebHome]] zum Einsatz. Diese werden dann einzelnen Accounts zugewiesen. Der NATS Server validiert das Zertifikat an Hand der Zertifikatskette und mapt dann die Zertifikatsinformationen zu einem User. Üblicherweise wird dabei ein SAN (Subject Alternative Name) in Form einer Mail-Adresse verwendet. Diese gibt es in unserem Fall nicht, sodass das Subject verwendet wird. Wichtig ist, dass das Subject in RFC2253 Form verwendet werden muss.
48
49 {{code language="shell"}}
50 $ openssl x509 -noout -text -nameopt RFC2253 -in backend.mksp-da.de.crt
51 Certificate:
52 Data:
53 ...
54 Subject: CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE
55 {{/code}}
56
57 Diese Subject wird dann dem Account zugeordnet:
58
59 {{code}}
60 system_account: SYS
61 accounts: {
62 MKSP: {
63 jetstream: enabled
64 users: [
65 {
66 user: "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
67 },
68 {
69 user: "CN=n8n.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
70 },
71 {
72 user: "CN=js-admin.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
73 },
74 ]
75 }
76 SYS: {
77 users: [
78 {user: "CN=admin.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"}
79 ]
80 }
81 }
82 {{/code}}
83
84 Da ein User nur einem Account zugeordnet werden kann, gibt es einen admin User für administrative Tätigkeiten sowie einen JetStream Admin User für die Account-Verwaltung.
85
86 == NATS Core mit Accounts ==
87
88 Damit die NATS Core Messages auch weiterhin funktionieren müssen publish und subscribe Berechtigungen vergeben werden. NATS wird neben dem Eventmanagement auch noch für die Task Queue mit Taskiq eingesetzt, hier werden zur Zeit noch Core Funktionalitäten verwendet. Daher muss die Berechtigung für den backend User erweitert werden:
89
90 {{code}}
91 accounts: {
92 MKSP: {
93 users: [
94 {
95 user: "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE"
96 permissions: {
97 publish: [">"]
98 subscribe: [">"]
99 }
100 },
101 ...
102 {{/code}}
103
104 Wichtig sind hier die zusätzlichen Permissions, die den Zugriff auf Core Events regeln.
105
106 == JetStream Konfiguration ==
107
108 In unserem Fall ist Persistenz auch bei einem zeitweisen Ausfall der Internetverbindung im Makerspace notwendig, sodass wir JetStream mit Message Retention verwenden. Dabei müssen die wichtigen Komponenten vorab angelegt werden.
109
110 === NATS CLI konfigurieren ===
111
112 Zur Administration bietet sich die Verwendung der [[NATS CLI >>url:https://github.com/nats-io/natscli||shape="rect"]]an. Diese muss einmalig eingerichtet werden, dafür sind die entsprechenden Zertifikatsdateien notwendig. Wir legen zwei Kontexte an, einen für den Cluster Admin, einen für den JetStream Admin:
113
114 {{code language="shell"}}
115 nats context add mksp-sysadmin --server ds-hetzner.mksp-da.de:4222 --description "MKSP NATS SYS Admin" --tlscert /path/to/admin.mksp-da.de.crt --tlskey /path/to/admin.mksp-da.de.key --tlsca /path/to/mksp_root_x1_ca.crt
116
117 nats context add mksp-jsadmin --server ds-hetzner.mksp-da.de:4222 --description "MKSP NATS JetStream" --tlscert /path/to/jsadmin.mksp-da.de.crt --tlskey /path/to/jsadmin.mksp-da.de.key --tlsca /path/to/mksp_root_x1_ca.crt
118 {{/code}}
119
120 Anschließend wählen wir den richtigen Kontext aus, in diesem Fall den JetStream Admin Context:
121
122 {{code language="shell"}}
123 nats context select mksp-jsadmin
124 {{/code}}
125
126 === JetStream initialisieren ===
127
128 JetStream ist in so genannte **Streams** unterteilt, welche bestimmte **Subjects** in einem Stream zusammenfassen. Wir nutzen für alle Events einen gemeinsamen Präfix (mksp), sodass wir den Stream hierfür erstellen. Die NATS Dokumentation selbst beschreibt die Konzepte sehr gut:
129
130 * JetStream im Allgemeinen: [[https:~~/~~/docs.nats.io/nats-concepts/jetstream>>url:https://docs.nats.io/nats-concepts/jetstream||shape="rect"]]
131 * JetStream Streams: [[https:~~/~~/docs.nats.io/nats-concepts/jetstream/streams>>url:https://docs.nats.io/nats-concepts/jetstream/streams||shape="rect"]]
132
133 Wir legen nun also ein Stream namens **backend_events** an:
134
135 {{code language="shell"}}
136 nats stream add backend_events --subjects "mksp.>" --retention work --max-age 7d --storage file --defaults
137 {{/code}}
138
139 === JetStream Consumer ===
140
141 Consumer sind eine Auswahl / Ansicht von Teilen eines Streams. Details finden sich wieder in der NATS Dokumentation: [[https:~~/~~/docs.nats.io/nats-concepts/jetstream/consumers>>url:https://docs.nats.io/nats-concepts/jetstream/consumers||shape="rect"]]
142
143 Consumer können entweder persistent sein (dauerhaft, ein Client kann dann den bestehenden Consumer neben) oder flüchtig (ephemeral), in diesem Fall wird der Consumer beim Erstellen des Clients (beispielsweise durch N8n) angelegt. In der Regel werden Consumer durch einen Filter auf bestimmte Subjects beschränkt, diese dürfen sich nicht überschneiden. Da die NATS JavaScript Client Library (die von n8n verwendet wird) es leider nicht sinnvoll erlaubt, ephemeral Consumer anzulegen verwenden wir dauerhafte Consumer-Namen.
144
145 |=(((
146 Backend Event
147 )))|=(((
148 Consumer Name
149 )))|=(((
150 Schema
151 )))
152 |(((
153 {{{mksp.backend.briefing.offer.cancelled}}}
154
155 (((
156
157 )))
158 )))|(((
159 backend_briefing_offer_cancelled
160 )))|(((
161 (% class="nc" %)##BriefingOfferEvent##
162 )))
163 |(((
164 {{{mksp.backend.briefing.offer.created}}}
165
166 (((
167
168 )))
169 )))|(((
170 backend_briefing_offer_created
171 )))|(((
172 (% class="nc" %)##BriefingOfferEvent##
173 )))
174 |(((
175 {{{mksp.backend.briefing.offer.updated}}}
176 )))|(((
177 backend_briefing_offer_updated
178 )))|(((
179 (% class="nc" %)##BriefingOfferEvent##
180 )))
181 |(((
182 {{{mksp.backend.briefing.created}}}
183
184 (((
185
186 )))
187 )))|(((
188 backend_briefing_created
189 )))|(((
190 (% class="n" %)##BriefingEvent##
191 )))
192 |(((
193 {{{mksp.backend.key.assigned}}}
194 )))|(((
195 backend_key_assigned
196 )))|(((
197 (% class="nc" %)##KeyEvent##
198 )))
199 |(((
200 {{{mksp.backend.key.unassigned}}}
201 )))|(((
202 backend_key_unassigned
203 )))|(((
204 (% class="nc" %)##KeyEvent##
205 )))
206 |(((
207 {{{mksp.backend.storage.reserved}}}
208 )))|(((
209 backend_storage_reserved
210 )))|(((
211 (% class="nc" %)##StorageSpaceEvent##
212 )))
213 |(((
214 mksp.backend.storage.released
215 )))|(((
216 backend_storage_released
217 )))|(((
218 (% class="nc" %)##StorageSpaceEvent##
219 )))
220 |(((
221 mksp.backend.storage.expired
222 )))|(((
223 backend_storage_expired
224 )))|(((
225 (% class="nc" %){{code language="none"}}StorageSpaceEvent{{/code}}
226 )))
227 |(((
228 mksp.backend.user.converted_to_member
229 )))|(((
230 backend_user_converted_to_member
231 )))|(((
232 (% class="nc" %){{code language="none"}}UserEvent{{/code}}
233 )))
234 |(((
235 mksp.backend.user.converted_to_guest
236 )))|(((
237 backend_user_converted_to_guest
238 )))|(((
239 (% class="nc" %){{code language="none"}}UserEvent{{/code}}
240 )))
241 |(((
242 mksp.docuseal.document.signed
243 )))|docuseal_document_signed|(((
244 SignatureEvent
245 )))
246 |(((
247 mksp.docuseal.signature.expired
248 )))| |
249 |(((
250 mksp.docuseal.signature.completed
251 )))| |
252
253 Die Consumer können dann wie folgt angelegt werden:
254
255 {{code}}
256 $ nats consumer add backend_events <name> --pull --filter="<filter>" --defaults
257
258 # Beispiel
259 nats consumer add backend_events backend_briefing_offer_cancelled --pull --filter="mksp.backend.briefing.offer.cancelled" --defaults
260 nats consumer add backend_events backend_key_assigned --pull --filter="mksp.backend.key.assigned" --defaults
261 {{/code}}
262
263 = {{id name="NATSSetup-Troubleshooting"/}}Troubleshooting =
264
265 Man kann über die NATS CLI mit dem Server interagieren.
266
267 == {{id name="NATSSetup-JetStream"/}}JetStream ==
268
269 === {{id name="NATSSetup-Streams"/}}Streams ===
270
271 Man kann sich die vorhandenen Streams und die Anzahl der Nachrichten anschauen. Die Anzahl sollte üblicherweise 0 sein, wenn die Zahl höher ist, werden Events nicht richtig abgearbeitet.
272
273 {{code language="shell"}}
274 $ nats context select mksp-jsadmin
275 $ nats stream ls
276 ╭─────────────────────────────────────────────────────────────────────────────────────╮
277 │ Streams │
278 ├────────────────┬─────────────┬─────────────────────┬──────────┬──────┬──────────────┤
279 │ Name │ Description │ Created │ Messages │ Size │ Last Message │
280 ├────────────────┼─────────────┼─────────────────────┼──────────┼──────┼──────────────┤
281 │ backend_events │ │ 2025-09-08 08:36:19 │ 0 │ 0 B │ never │
282 ╰────────────────┴─────────────┴─────────────────────┴──────────┴──────┴──────────────╯
283 {{/code}}
284
285 Ebenso kann man sich - falls vorhandenen - die Nachrichten im Stream anzeigen lassen:
286
287 {{code language="shell"}}
288 $ nats context select mksp-jsadmin
289 $ nats stream view backend_events
290 [1] Subject: mksp.backend.storage.reserved Received: 2025-09-08 09:06:00
291
292 {"timestamp":"2025-09-08T09:06:00.062358","stora
293 {{/code}}
294
295 === {{id name="NATSSetup-Consumer"/}}Consumer ===
296
297 Die bestehenden Consumer kann man sich ebenfalls im Jetstream Context anzeigen lassen:
298
299 {{code}}
300 $ nats consumer ls backend_events                       
301 ╭──────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
302 │ Consumers │
303 ├──────────────────────────────────┬─────────────┬─────────────────────┬─────────────┬─────────────┬───────────────┤
304 │ Name │ Description │ Created │ Ack Pending │ Unprocessed │ Last Delivery │
305 ├──────────────────────────────────┼─────────────┼─────────────────────┼─────────────┼─────────────┼───────────────┤
306 │ backend_briefing_created │ │ 2025-09-10 08:30:12 │ 0 │ 12 │ never │
307 │ backend_briefing_offer_cancelled │ │ 2025-09-10 08:21:02 │ 0 │ 0 │ never │
308 │ backend_briefing_offer_created │ │ 2025-09-10 08:28:25 │ 0 │ 0 │ never │
309 │ backend_briefing_offer_updated │ │ 2025-09-10 08:29:19 │ 0 │ 0 │ never │
310 │ backend_key_assigned │ │ 2025-09-10 08:21:04 │ 0 │ 0 │ never │
311 ╰──────────────────────────────────┴─────────────┴─────────────────────┴─────────────┴─────────────┴───────────────╯
312 {{/code}}
313
314 In dieser Tabelle sieht man bereits die "unprocessed" Nachrichten. Ist diese Zahl größer als 0 s sieht also gut, wenn n8n die Nachrichten nicht korrekt verarbeitet. Man kann sich ebenfalls die Details eines Consumers anzeigen lassen:
315
316 {{code}}
317 $ nats consumer info backend_events backend_briefing_offer_cancelled
318
319 Information for Consumer backend_events > backend_briefing_offer_cancelled created 2025-09-09 22:53:26
320
321 Configuration:
322
323 Name: backend_briefing_offer_cancelled
324 Pull Mode: true
325 Filter Subject: mksp.backend.briefing.offer.cancelled
326 Deliver Policy: All
327 Ack Policy: Explicit
328 Ack Wait: 30.00s
329 Replay Policy: Instant
330 Max Ack Pending: 1,000
331 Max Waiting Pulls: 512
332
333 State:
334
335 Host Version: 2.11.8
336 Required API Level: 0 hosted at level 1
337 Last Delivered Message: Consumer sequence: 0 Stream sequence: 10
338 Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
339 Outstanding Acks: 0 out of maximum 1,000
340 Redelivered Messages: 0
341 Unprocessed Messages: 0
342 Waiting Pulls: 1 of maximum 512
343 {{/code}}
344
345 Insbesondere die letzten beiden Zeilen sind wichtig:
346
347 * **Unprocessed Messages** sind Nachrichten, die noch nicht abgerufen wurden. Dies sollte immer 0 sein
348 * **Waiting Pulls**: die Anzahl der verbundenen Clients. Üblicherweise sollte das nur n8n sein, also immer 1.
349
350 == {{id name="NATSSetup-Server"/}}Server ==
351
352 Man kann sich ebenfalls die aktiven Verbindungen anzeigen:
353
354 {{code language="shell"}}
355 $ nats context select mksp-sysadmin
356 $ nats server report connections   
357
358 ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
359 │ Top 6 Connections out of 6 by subs │
360 ├─────┬──────────────────────────┬──────────────┬─────────┬───────────────────┬─────────┬──────────┬─────────┬──────────┬──────────┬───────────┬──────┤
361 │ CID │ Name │ Server │ Cluster │ IP │ Account │ Uptime │ In Msgs │ Out Msgs │ In Bytes │ Out Bytes │ Subs │
362 ├─────┼──────────────────────────┼──────────────┼─────────┼───────────────────┼─────────┼──────────┼─────────┼──────────┼──────────┼───────────┼──────┤
363 │ 83 │ backend_kicker │ mksp-nats-01 │ │ 172.18.0.33:52808 │ MKSP │ 2h24m18s │ 0 │ 0 │ 0 B │ 0 B │ 0 │
364 │ 93 │ backend_scheduler │ mksp-nats-01 │ │ 172.18.0.29:47620 │ MKSP │ 2h23m23s │ 0 │ 0 │ 0 B │ 0 B │ 0 │
365 │ 82 │ backend_fastapi │ mksp-nats-01 │ │ 172.18.0.33:52798 │ MKSP │ 2h24m18s │ 1 │ 1 │ 345 B │ 838 B │ 1 │
366 │ 94 │ backend_worker │ mksp-nats-01 │ │ 172.18.0.37:53550 │ MKSP │ 2h23m23s │ 0 │ 0 │ 0 B │ 0 B │ 1 │
367 │ 95 │ backend_worker │ mksp-nats-01 │ │ 172.18.0.37:53554 │ MKSP │ 2h23m23s │ 0 │ 0 │ 0 B │ 0 B │ 1 │
368 │ 111 │ NATS CLI Version 0.2.4 │ mksp-nats-01 │ │ 10.1.253.1:50712 │ SYS │ 0s │ 2 │ 1 │ 210 B │ 944 B │ 1 │
369 ├─────┼──────────────────────────┼──────────────┼─────────┼───────────────────┼─────────┼──────────┼─────────┼──────────┼──────────┼───────────┼──────┤
370 │ │ Totals for 6 connections │ │ │ │ │ │ 3 │ 2 │ 555 B │ 1.7 KiB │ 4 │
371 ╰─────┴──────────────────────────┴──────────────┴─────────┴───────────────────┴─────────┴──────────┴─────────┴──────────┴──────────┴───────────┴──────╯
372
373 ╭──────────────────────────────────────╮
374 │ Connections per server │
375 ├──────────────┬─────────┬─────────────┤
376 │ Server │ Cluster │ Connections │
377 ├──────────────┼─────────┼─────────────┤
378 │ mksp-nats-01 │ │ 6 │
379 ╰──────────────┴─────────┴─────────────╯
380 {{/code}}
381
382 Der Name ist hierbei frei vergeben, in unserem Fall im Backend Code. Wir erwarten die folgenden Clients:
383
384 |=(((
385 Client
386 )))|=(((
387 Anzahl
388 )))|=(((
389 Komponente
390 )))|=(((
391 Art
392 )))|=(((
393 Beschreibung
394 )))
395 |(((
396 backend_kicker
397 )))|(((
398 1
399 )))|(((
400 Taskiq
401 )))|(((
402 Core
403 )))|(((
404 Gibt dem Backend die Möglichkeit, asynchrone Tasks an das Taskiq backend zu delegieren
405 )))
406 |(((
407 backend_scheduler
408 )))|(((
409 1
410 )))|(((
411 Taskiq
412 )))|(((
413 Core
414 )))|(((
415 Komponente welches die regelmäßigen Jobs überwacht und triggert
416 )))
417 |(((
418 backend_fastapi
419 )))|(((
420 1
421 )))|(((
422 FastAPI
423 )))|(((
424 JetStream
425 )))|(((
426 Das Backend kann hiermit Events ausgeben, die dann von N8n abgearbeitet werden.
427 )))
428 |(((
429 backend_worker
430 )))|(((
431 2
432 )))|(((
433 Taskiq
434 )))|(((
435 Core
436 )))|(((
437 Worker für asynchrone Tasks
438 )))
439
440 Man kann sich ebenfalls die Subscriptions der jeweiligen Connections anschauen. Hierfür muss die Ausgabe auf JSON umgestellt werden:
441
442 {{code language="shell"}}
443 $ nats context select mksp-jsadmin
444 $ nats server report connections -j
445 [
446 {
447 "cid": 95,
448 "kind": "Client",
449 "type": "nats",
450 "ip": "172.18.0.37",
451 "port": 53554,
452 "start": "2025-09-08T13:35:37.398441269Z",
453 "last_activity": "2025-09-08T13:35:37.434868631Z",
454 "rtt": "479µs",
455 "uptime": "2h27m5s",
456 "idle": "2h27m5s",
457 "pending_bytes": 0,
458 "in_msgs": 0,
459 "out_msgs": 0,
460 "in_bytes": 0,
461 "out_bytes": 0,
462 "subscriptions": 1,
463 "name": "backend_worker",
464 "lang": "python3",
465 "version": "2.11.0",
466 "tls_version": "1.3",
467 "tls_cipher_suite": "TLS_AES_128_GCM_SHA256",
468 "tls_peer_certs": [
469 {
470 "subject": "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE",
471 "spki_sha256": "1c7826ac936267622a5bfec0359304e795a4fd552b4d76556d7fcbddf51f43fb",
472 "cert_sha256": "e25dd3084b13f5f3cb81ebb9291627bcd0a25ab51325f1cc627a2b758f03d1ca"
473 },
474 {
475 "subject": "CN=MKSP NATS Issuing CA,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE",
476 "spki_sha256": "af16c3490475a4ba65d273b7dae825622b4c1eeca5f3da808df14b3c298627a7",
477 "cert_sha256": "10ccebe5826128f50f7b007537fa8b832326d4bb14b1676588abac9a10bf7585"
478 }
479 ],
480 "authorized_user": "CN=backend.mksp-da.de,OU=Makerspace Darmstadt IT,O=Makerspace Darmstadt e.V.,C=DE",
481 "account": "MKSP",
482 "subscriptions_list": [
483 "taskiq_tasks"
484 ],
485 "name_tag": "MKSP",
486 "server": {
487 "name": "mksp-nats-01",
488 "host": "0.0.0.0",
489 "id": "NDH7ZAPFJ5EORJKZDJGOUNN3NDMTLWR6V7BGBOYUIG5UE7VEIK76F6WH",
490 "ver": "2.11.6",
491 "jetstream": true,
492 "flags": 7,
493 "seq": 2937,
494 "time": "2025-09-08T16:02:42.888686229Z"
495 }
496 },
497 ]
498 {{/code}}
499
500