scala - Writing to Oracle Database using Apache Spark 1.4.0 -


i trying write data our oracle database using spark 1.4.0 dataframe.write.jdbc() function.

the symmetric read.jdbc() function reading data oracle database dataframe objects works well. while writing dataframe (i tried write same object got database setting cverwrite true) gives following exception:

exception in thread "main" java.sql.sqlsyntaxerrorexception: ora-00902: ungültiger datentyp      @ oracle.jdbc.driver.t4cttioer.processerror(t4cttioer.java:450)     @ oracle.jdbc.driver.t4cttioer.processerror(t4cttioer.java:399)     @ oracle.jdbc.driver.t4c8oall.processerror(t4c8oall.java:1017)     @ oracle.jdbc.driver.t4cttifun.receive(t4cttifun.java:655)     @ oracle.jdbc.driver.t4cttifun.dorpc(t4cttifun.java:249)     @ oracle.jdbc.driver.t4c8oall.dooall(t4c8oall.java:566)     @ oracle.jdbc.driver.t4cpreparedstatement.dooall8(t4cpreparedstatement.java:215)     @ oracle.jdbc.driver.t4cpreparedstatement.dooall8(t4cpreparedstatement.java:58)     @ oracle.jdbc.driver.t4cpreparedstatement.executeforrows(t4cpreparedstatement.java:943)     @ oracle.jdbc.driver.oraclestatement.doexecutewithtimeout(oraclestatement.java:1075)     @ oracle.jdbc.driver.oraclepreparedstatement.executeinternal(oraclepreparedstatement.java:3820)     @ oracle.jdbc.driver.oraclepreparedstatement.executeupdate(oraclepreparedstatement.java:3897)     @ oracle.jdbc.driver.oraclepreparedstatementwrapper.executeupdate(oraclepreparedstatementwrapper.java:1361)     @ org.apache.spark.sql.dataframewriter.jdbc(dataframewriter.scala:252)     @ main3$.main(main3.scala:72)     @ main3.main(main3.scala)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:497)     @ com.intellij.rt.execution.application.appmain.main(appmain.java:140) 

the table has 2 basic string columns. when integer, can write it.

actually when go deeper, realize maps stringtype "text" not recognized oracle (should "varchar" instead). code following jdbc.scala may found @ github:

def schemastring(df: dataframe, url: string): string = {       val sb = new stringbuilder()       val dialect = jdbcdialects.get(url)       df.schema.fields foreach { field => {         val name = field.name         val typ: string =           dialect.getjdbctype(field.datatype).map(_.databasetypedefinition).getorelse(           field.datatype match {             case integertype => "integer"             case longtype => "bigint"             case doubletype => "double precision"             case floattype => "real"             case shorttype => "integer"             case bytetype => "byte"             case booleantype => "bit(1)"             case stringtype => "text"             case binarytype => "blob"             case timestamptype => "timestamp"             case datetype => "date"             case decimaltype.unlimited => "decimal(40,20)"             case _ => throw new illegalargumentexception(s"don't know how save $field jdbc")           })         val nullable = if (field.nullable) "" else "not null"         sb.append(s", $name $typ $nullable")       }}       if (sb.length < 2) "" else sb.substring(2)     } 

so question mistaken somewhere or sparksql not support oracle , should install plug-in use sparksql oracle?

my simple main is:

val conf = new sparkconf().setappname("parser").setmaster("local[*]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc)    val reader = sqlcontext.read val frame = reader.jdbc(url,"students",connectionprop)  frame.printschema() frame.show()   val row = row("3","4")   val struct =   structtype(     structfield("one", stringtype, true) ::       structfield("two", stringtype, true) :: nil)  val arr = array(row) val rddrow = sc.parallelize(arr) val dframe = sqlcontext.createdataframe(rddrow,struct ) dframe.printschema() dframe.show()  dframe.write.jdbc(url,"students",connectionprop) 

actual answer - it's not possible write oracle using existing dataframe.write.jdbc() implementation in 1.4.0 if don't mind upgrade spark 1.5 there little bit hackish way it. described here there 2 problems:

easy 1 - spark way check table existence not compatible oracle

select 1 $table limit 1 

that can avoided direct save table utility method

org.apache.spark.sql.execution.datasources.jdbc.jdbcutils.savetable(df, url, table, props) 

and hard 1 (as you've guessed) - there no oracle specific data type dialect available out of box. adopted same article solution:

import org.apache.spark.sql.jdbc.{jdbcdialects, jdbctype, jdbcdialect} import org.apache.spark.sql.types._    val oracledialect = new jdbcdialect {     override def canhandle(url: string): boolean = url.startswith("jdbc:oracle") || url.contains("oracle")      override def getjdbctype(dt: datatype): option[jdbctype] = dt match {       case stringtype => some(jdbctype("varchar2(255)", java.sql.types.varchar))       case booleantype => some(jdbctype("number(1)", java.sql.types.numeric))       case integertype => some(jdbctype("number(10)", java.sql.types.numeric))       case longtype => some(jdbctype("number(19)", java.sql.types.numeric))       case doubletype => some(jdbctype("number(19,4)", java.sql.types.numeric))       case floattype => some(jdbctype("number(19,4)", java.sql.types.numeric))       case shorttype => some(jdbctype("number(5)", java.sql.types.numeric))       case bytetype => some(jdbctype("number(3)", java.sql.types.numeric))       case binarytype => some(jdbctype("blob", java.sql.types.blob))       case timestamptype => some(jdbctype("date", java.sql.types.date))       case datetype => some(jdbctype("date", java.sql.types.date)) //      case decimaltype.fixed(precision, scale) => some(jdbctype("number(" + precision + "," + scale + ")", java.sql.types.numeric))       case decimaltype.unlimited => some(jdbctype("number(38,4)", java.sql.types.numeric))       case _ => none     }   }      jdbcdialects.registerdialect(oracledialect) 

so, finally, working example should similar to

  val url: string = "jdbc:oracle:thin:@your_domain:1521/dbname"   val driver: string = "oracle.jdbc.oracledriver"   val props = new java.util.properties()   props.setproperty("user", "username")   props.setproperty("password", "userpassword")   org.apache.spark.sql.execution.datasources.jdbc.jdbcutils.savetable(dataframe, url, "table_name", props) 

Comments

Popular posts from this blog

toolbar - How to add link to user registration inside toobar in admin joomla 3 custom component -

linux - disk space limitation when creating war file -

How to provide Authorization & Authentication using Asp.net, C#? -