Azure Stream Analytics, prosty przypadek
Azure Stream Analytics to chyba jeden z największych "kombajnów" na Azure, a przynajmniej im głębiej w niego
zaglądam tym więcej ciekawostek znajduję. Ograniczę się jednak na razie do przypadków IoT.
Czym jest Stream Analytics?
Azure Stream Analytics to usługa pozwalająca na skomplikowane przetwarzanie w czasie rzeczywistym zdarzeń. A
właściwie jest to silnik do przetwarzania zdarzeń, ponieważ może być uruchamiany również na urządzeniach IoT Edge,
nie tylko w Azure jakoś usługa PaaS. Stream Analytics potrafi przetwarzać duże wolumeny danych streamingowych
z wielu równoległych źródeł. Świetnie sprawdza się we wszystkich przypadkach danych IoT czy to będzie telemetria czy
dane analityczne z autonomicznych samochodów. Ale również poradzi sobie z danymi streamingowymi np. wykrywania
anomalii giełdowych, czy przetwarzaniem logów czy analityk aplikacji.
Create new
Stream Analytics jest naszym silnikiem do uruchamiania pojedynczych 'zadań'. Jednostek obliczeniowych. Czyli Stream
Analytics Job, czyli taką usługę sobie tworzymy. Wiele nie potrzeba, nazwa, subskrypcja, lokalizacja. Mamy też do
wyboru czy jest to usługa w chmurze, czy też będziemy sobie ją uruchamiać bezpośrednio na urządzeniach.
Streaming units. To dość ważne ustawienie. Stream Analytics zapewnia nam bardzo proste skalowanie każdego joba, po
prostu dodajemy Streaming Unit. Jeden SU kosztuje 0.12$/h, czyli około 88$ na miesiąc. Proponowane 6 SU na początek
jest dość dużą wartością, a ile jednostek potrzebujemy musimy określić na podstawie ilości danych i sposobu ich
obróbki. Jest to na tyle skomplikowane, że w dokumentacji jest osobna sekcja Leverage query parallelization in Azure Stream Analytics
Wjeżdżają nam tutaj dodatkowo partycjonowania (w końcu wiadomo po co nam partycje i kto to jest reader - w tym
wypadku to jeden SU). I tak, jeśli mamy jeden krok w Query Joba to maksymalnie możemy mieć 6 SU, ale jeśli czytamy
dane ze źródła, które ma 16 partycji dla tego samego joba możemy mieć 96 SU. Warto się zagłębić w
dokumentację.
Główne składowe
Na podsumowaniu naszego Joba widzimy najważniejsze składowe: Inputs, Outputs, Query. Tutaj również na szybko job
będzie informował, gdy coś jest nie tak z Inputami lub Outputami.
W prostym przykładzie odtwórzmy sobie ścieżkę zimną, czyli zapisywanie danych z IoT Huba do Blob Storage tak
jak one do nas przychodzą lub delikatnie zmienione.
Input
Ponieważ przetwarzamy stream danych to do wyboru mamy tylko usługi, które takie dane zwracają: Event Hub, IoT Hub
oraz Blob storage. Dodatkowo możemy używać danych referencyjnych przechowywanych w Blob storage lub bazie SQL
Wybieramy więc nowy input IoTHub i wypełniamy.
Pierwsze pole Input alias - to alias, którym potem będziemy się posługiwać w naszym zapytaniu, warto więc aby był
znaczący, ale też w miarę prosty. Dalej wyszukujemy sobie nasz IoT Hub, jego endpoint i łączymy po uprawnieniach.
I teraz robi się ciekawie. Consumer group - na wbudowanym endpoincie to grupa, której będzie używać ten Job do
czytania danych z naszego IoT Huba. Ja tutaj wybieram "secondgroup", przypominam poniżej również skąd się to wzięło.
Ciekawostką jest, że dostajemy tutaj podpowiedź, aby stworzyć osobną grupę per job ponieważ IoT Hub umożliwia tylko
5 readerów per grupę. Oczywiście jest to cenna podpowiedź i warto się do niej stosować, również dlatego że chcemy,
aby nasz job przetwarzał wszystkie przychodzące wiadomości, a nie konkurował z innymi readerami o dane.
Dalej mamy jeszcze dodatkowe opcje wynikające z wielości danych przyjmowanych. Możemy sobie ustawić typ kompresji
oraz posługiwać się innymi formatami danych niż tylko JSON i AVRO.
Outputs
Mamy tutaj dużo większy wybór. Usługi obsługujące dalej zdarzenia jak Event Hub, Servis Bus, Servis Bus Topic,
Azure Functions oraz wybór usług do przechowywania Power BI, Data Lake Storage, Blob Storage, Table Storage, SQL
Database oraz Cosmos DB. Przy Cosmos DB trzeba zaznaczyć, że na razie obsługiwane jest tylko API SQL
My wybieramy Blob Storage. I jedziemy przez konfiguracje. Tworzymy sobie osobny kontener. Dalej możemy wybrać
format ścieżki, w której będą zapisywane nasze pliki i rozmiar batcha czy to ilość wierszy czy też czas, po którym
partia danych i tak się zapisze. Żadnych nowości.
Query
Mając wejścia oraz wyjścia wystarczy przetworzyć jakoś dane z jednego końca na drugi.
Stream Analytics ma swój język zapytań bardzo podobny do zwykłego SQLa. W najprostszej wersji mamy co=SELECT,
skąd=FROM, dokąd=INTO.
Po standardowe wzorce zapytań zajrzyj na Stream
analytics query patterns, bo trochę tego jest :)
Okno zapytania. Po lewej mamy dostępne wszystkie wejścia i wyjścia. Jeśli mamy dostępne dane w elemencie
wejściowym, jak w tym przypadku w iotHub, mamy dodatkową ikonkę wskazującą na dokument. Możemy te dane przeglądać w
postaci tabeli lub w postaci surowych danych, w tym przypadku JSON. Jest to bardzo pomocne, jeśli chcemy sobie
ograniczyć pola, które będziemy wybierać lub, jak w tym przypadku, jeśli chcemy spłycić strukturę obiektu. Najpierw
biorę wszystkie pola, a potem jeszcze wszystkie pola z obiektu IoTHub.
Gdy napiszemy satysfakcjonujące nas zapytanie możemy go przetestować (na danych przykładowych które wcześniej
zostały zaciągnięte). To tylko poglądowy efekt naszej pracy. W taki sposób możemy napisać zapytanie, które da nam
wartość. Gdy skończymy zapisujemy zapytanie.
Run Job
Mamy jeszcze jeden krok do zrobienia. Musimy uruchomić nasz Job. Całą konfigurację robimy zawsze na zatrzymanym
Jobie. Za zatrzymany Job również nie płacimy. Jak? Klikamy Start. Możemy skonfigurować, kiedy Job ma się uruchomić
albo uruchomić go od razu.
W naszym przypadku job pobiera dane z IoTHuba i zapisuje do storege. Pliki nazywane są w sposób unikatowy, a że nie
wybraliśmy żadnego wzorca to tak po prostu luzem. Działa też batch po 5 minutach, bo mamy w pliku 67 wierszy choć
nie widać tego na screenie, ale kto ciekawy może zajrzeć do pliku
tutaj.
Komentarze
Prześlij komentarz