Введение в Apache Spark

Введение в Apache Spark

684
ПОДЕЛИТЬСЯ

Привет, хабр!

Сейчас разглядим инструмент, который наиболее популярен и предназначен для обработки огромных обьемов данных — Apache Spark. Напомним, что индивидуальностью данного инструмента является то, что он дозволяет строить в первую очередь линейные модели (которые, к слову, имеют неплохую обобщающую способность), а высочайшее качество алгоритмов достигается за счет отбора и генерации признаков, регуляризации и иных доп приемов. В прошедший раз мы разглядели превосходный инструмент Vowpal Wabbit, который бывает полезен в вариантах, когда приходится учиться на подборках , не помещающихся в оперативную память.

рассматриваются главные понятия и философия Spark’а. В этом мануале не будет много кода, т.к. В последующих статьях (про MlLib и GraphX) мы возьмем какой-нибудь датасет и подробнее разглядим Spark на практике. Сосредоточимся на практических вещах. Не будем вдаваться в подробности истории появления данного инструмента, а также его внутреннего устройства. В данной статье мы разглядим базисные операции и главные вещи, которые можно делать в Spark’е, а в последующий раз разглядим подробнее библиотеку MlLib машинного обучения, а также GraphX для обработки графов (создатель данного поста в основном для этого и употребляет данный инструмент — это как раз тот вариант, когда часто граф нужно держать в оперативной памяти на кластере, в то время как для машинного обучения чрезвычайно нередко довольно Vowpal Wabbit’а).

чрезвычайно комфортно работать конкретно в IPython Notebook, выгружая маленькую часть данных из кластера и обрабатывая, к примеру, пакетом Pandas — выходит достаточно комфортная связка Сходу скажем, что нативно Spark поддерживает Scala, Python и Java. Примеры будем разглядывать на Python, т.к.

Итак, начнем с того, что главным понятием в Spark’е является RDD (Resilient Distributed Dataset), который представляет собой Dataset, над которым можно делать преобразования 2-ух типов (и, соответственно, вся работа с этими структурами заключается в последовательности этих 2-ух действий).

Трансформации
Как правило, это операции, которые каким-или образом преобразовывают элементы данного датасета. Вот неполный самых всераспространенных преобразований, каждое из которых возвращает новейший датасет (RDD): Результатом внедрения данной операции к RDD является новейший RDD.

.map(function) — применяет функцию function к каждому элементу датасета

.filter(function) — возвращает все элементы датасета, на которых функция function вернула истинное значение

.distinct([numTasks]) — возвращает датасет, который содержит неповторимые элементы начального датасета

Также стоит отметить о операциях над множествами, смысл которых понятен из заглавий:

.union(otherDataset)

.intersection(otherDataset)

.cartesian(otherDataset) — новейший датасет содержит в для себя различные пары (A,B), где 1-ый элемент принадлежит начальному датасету, а 2-ой — датасету-аргументу

Деяния
Вот перечень самых всераспространенных действий, которые можно использовать над RDD: Деяния используются тогда, когда нужно материализовать итог — как правило, сохранить данные на диск, или вывести часть данных в консоль.

.saveAsTextFile(path) — сохраняет данные в текстовый файл (в hdfs, на локальную машинку либо в всякую другую поддерживаемую файловую систему — полный перечень можно поглядеть в документации)

Как правило, это применяется в вариантах, когда данных в датасете уже не достаточно (использованы разные фильтры и преобразования) — и нужна визуализация, или доп анализ данных, к примеру средствами пакета Pandas .collect() — возвращает элементы датасета в виде массива.

.take(n) — возвращает в виде массива 1-ые n частей датасета

.count() — возвращает количество частей в датасете

Из механизма данной операции следует, что функция function (которая воспринимает на вход 2 аргумента возвращает одно значение) обязана быть непременно коммутативной и ассоциативной .reduce(function) — знакомая операция для тех, кто знаком с MapReduce.

Это базы, которые нужно знать при работе с инвентарем. Сейчас незначительно займемся практикой и покажем, как загружать данные в Spark и делать с ними обыкновенные вычисления

При запуске Spark, 1-ое, что нужно сделать — это сделать SparkContext (раз говорить простыми словами — это обьект, который отвечает за реализацию наиболее низкоуровневых операций с кластером — подробнее — см. документацию), который при запуске Spark-Shell создается автоматом и доступен сходу (обьект sc)

Загрузка данных
Загружать данные в Spark можно 2-мя способами:

а). Конкретно из локальной программы с помощью функции .parallelize(data)

localData = [5,7,1,12,10,25]
ourFirstRDD = sc.parallelize(localData)
б). Из поддерживаемых хранилищ (к примеру, hdfs) с помощью функции .textFile(path)

ourSecondRDD = sc.textFile("path to some data on the cluster")
В этом пт принципиально отметить одну изюминка хранения данных в Spark’e и в тоже время самую полезную функцию .cache() (частично благодаря которой Spark стал так популярен), которая дозволяет закэшировать данные в оперативной памяти (с учетом доступности крайней). Это дозволяет создавать итеративные вычисления в оперативной памяти, тем самым избавившись от IO-overhead’а. Это в особенности принципиально в контексте машинного обучения и вычислений на графах, т.к. большая часть алгоритмов итеративные — начиная от градиентных способов, заканчивая таковыми методами, как PageRank

Работа с данными
Опосля загрузки данных в RDD мы можем делать над ним разные трансформации и деяния, о которых говорилось выше. К примеру:

Поглядим 1-ые несколько частей:

for item in ourRDD.top(10):
print item
Или сходу загрузим эти элементы в Pandas и будем работать с DataFrame’ом:

import pandas as pd
pd.DataFrame(ourRDD.map(lambda x: x.split(";")[:]).top(10))
Вообщем, как видно, Spark так комфортен, что далее, наверняка нет смысла писать разные примеры, а можно просто бросить это упражнение читателю — почти все вычисления пишутся практически в несколько строк

Как просто додуматься, сделать это можно, к примеру, с помощью функции .reduce(): Напоследок, покажем только пример трансформации, а конкретно, вычислим наибольший и малый элементы нашего датасета.

localData = [5,7,1,12,10,25]
ourRDD = sc.parallelize(localData)
print ourRDD.reduce(max)
print ourRDD.reduce(min)
Как уже было отмечено, в последующий раз мы разглядим тщательно библиотеку MlLib и раздельно — GraphX habrahabr.ru Итак, мы разглядели главные понятия, нужные для работы с инвентарем. Мы не разглядывали работу с SQL, работу с парами <ключ, значение> (что делается просто — для этого довольно поначалу применить к RDD, к примеру, фильтр, чтоб выделить, ключ, а далее — уже просто воспользоваться встроенными функциями, вроде sortByKey, countByKey, join и др.) — читателю предлагается ознакомиться с сиим без помощи других, а при появлении вопросцев — написать в комменты.