2015-07-14 4 views
1

У меня есть rdd, который распределяется по нескольким машинам в искровой среде. Я хотел бы выполнить функцию на каждом рабочем компьютере на этом rdd. Я не хочу собирать rdd, а затем выполнять функцию в драйвере. Функция должна выполняться отдельно для каждого исполнителя для собственного rdd. Как я могу это сделатьЗадача apache spark running на каждом rdd

Update (добавление кода) Я бегу все это в свече оболочки

import org.apache.spark.sql.cassandra.CassandraSQLContext 
import java.util.Properties 

val cc = new CassandraSQLContext(sc) 
val rdd = cc.sql("select * from sams.events where appname = 'test'"); 
val df = rdd.select("appname", "assetname"); 

Здесь у меня есть ФР с 400 строками. Мне нужно сохранить эту таблицу df в sql server. Когда я пытаюсь использовать df.write метод дает мне ошибки, которые я разместил в отдельном потоке spark dataframe not appending to the table

я могу открыть DriverManager и вставить СВЯЗЬ строк, но это будет сделано в модуле драйвера искры

import java.sql._ 
import com.microsoft.sqlserver.jdbc.SQLServerDriver 
// create a Statement from the connection 
Statement statement = conn.createStatement(); 

// insert the data 
statement.executeUpdate("INSERT INTO Customers " + "VALUES (1001, 'Simpson', 'Mr.', 'Springfield', 2001)"); 
String connectionUrl = "jdbc:sqlserver://localhost:1433;" + 
    "databaseName=AdventureWorks;user=MyUserName;password=*****;"; 
Connection con = DriverManager.getConnection(connectionUrl); 

Мне нужно сделать это на машинке исполнителя. Как я могу это достичь?

+1

Это нормальная операция искры. Вы бы опубликовали код, над которым работаете? Вам может потребоваться некоторое руководство в хорошем направлении. – maasg

+0

У меня есть dataframe, который содержит коллекции строк sql. Теперь я получаю этот dataframe от cassandracql. Я обрабатываю этот фрейм данных и должен писать этот файл данных на сервер sql. Я не могу использовать jdbc метод dataframewriter, поэтому я использую insert в statement. – Nipun

+0

после прочтения об искрах, я прочитал, что код, как правило, работает в драйвере, и какая-то операция, которую необходимо выполнить, выполняется на исполнителе. – Nipun

ответ

0

Для того, чтобы настройки подключений от рабочих к другим системам, мы должны использовать rdd.foreachPartitions(iter => ...)

foreachPartitions позволяет выполнить операцию для каждого раздела, предоставляя доступ к данным раздела в качестве локального итератора. С достаточным количеством данных для раздела время установки ресурсов (например, подключений db) амортизируется с использованием таких ресурсов по всему разделу.

аннотация, например.

rdd.foreachPartition(iter => 
    //setup db connection 
    val dbconn = Driver.connect(ip, port) 
    iter.foreach{element => 
     val query = makeQuery(element) 
     dbconn.execute(query) 
    } 
    dbconn.close 
} 

Также можно создать однопользовательских менеджеров ресурсов, которые управляют этими ресурсами для каждой JVM кластера. См. Также этот ответ для полного примера такого локального менеджера ресурсов: spark-streaming and connection pool implementation

+0

Большое спасибо maasg. У меня есть данные из соединителя Cassandra, который заполняется в кадре данных. Я обрабатываю его, и теперь мне нужно написать его sqlserver. Для этого я создал отдельный фрейм данных. Вы знаете, как я могу скопировать один файл данных в другой, а затем записать его в sqlserver? – Nipun

Смежные вопросы