Skip to content

Commit

Permalink
Adding Oracle and Teradata sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelbranco80 committed May 31, 2024
1 parent 8d54269 commit 7b1eaa4
Show file tree
Hide file tree
Showing 28 changed files with 1,001 additions and 12 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ lazy val sources = (project in file("sources"))
postgresqlDeps,
mysqlDeps,
mssqlDeps,
snowflakeDeps
snowflakeDeps,
oracleDeps,
teradataDeps
)
)

Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ object Dependencies {
val mysqlDeps = "com.mysql" % "mysql-connector-j" % "8.1.0-rawlabs"
val mssqlDeps = "com.microsoft.sqlserver" % "mssql-jdbc" % "7.0.0.jre10"
val snowflakeDeps = "net.snowflake" % "snowflake-jdbc" % "3.13.33"
val oracleDeps = "com.raw-labs" % "ojdbc7" % "12.1.0.1.0"
val teradataDeps = "com.raw-labs" % "terajdbc4" % "17.00.00.03"
val icuDeps = "com.ibm.icu" % "icu4j" % "73.2"
val poiDeps = Seq(
"org.apache.poi" % "poi" % "5.2.3",
Expand Down
15 changes: 12 additions & 3 deletions sources/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
requires org.postgresql.jdbc;
requires com.microsoft.sqlserver.jdbc;
requires mysql.connector.j;

requires raw.utils;
requires raw.client;

Expand All @@ -77,6 +78,8 @@
exports raw.sources.jdbc.snowflake;
exports raw.sources.jdbc.sqlite;
exports raw.sources.jdbc.sqlserver;
exports raw.sources.jdbc.oracle;
exports raw.sources.jdbc.teradata;

opens raw.creds.api to
com.fasterxml.jackson.databind;
Expand Down Expand Up @@ -108,7 +111,9 @@
raw.sources.jdbc.pgsql.PostgresqlLocationBuilder,
raw.sources.jdbc.snowflake.SnowflakeLocationBuilder,
raw.sources.jdbc.sqlite.SqliteLocationBuilder,
raw.sources.jdbc.sqlserver.SqlServerLocationBuilder;
raw.sources.jdbc.sqlserver.SqlServerLocationBuilder,
raw.sources.jdbc.oracle.OracleLocationBuilder,
raw.sources.jdbc.teradata.TeradataLocationBuilder;

uses raw.sources.jdbc.api.JdbcSchemaLocationBuilder;

Expand All @@ -117,7 +122,9 @@
raw.sources.jdbc.pgsql.PostgresqlSchemaLocationBuilder,
raw.sources.jdbc.snowflake.SnowflakeSchemaLocationBuilder,
raw.sources.jdbc.sqlite.SqliteSchemaLocationBuilder,
raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder;
raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder,
raw.sources.jdbc.oracle.OracleSchemaLocationBuilder,
raw.sources.jdbc.teradata.TeradataSchemaLocationBuilder;

uses raw.sources.jdbc.api.JdbcTableLocationBuilder;

Expand All @@ -126,5 +133,7 @@
raw.sources.jdbc.pgsql.PostgresqlTableLocationBuilder,
raw.sources.jdbc.snowflake.SnowflakeTableLocationBuilder,
raw.sources.jdbc.sqlite.SqliteTableLocationBuilder,
raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder;
raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder,
raw.sources.jdbc.oracle.OracleTableLocationBuilder,
raw.sources.jdbc.teradata.TeradataTableLocationBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ raw.sources.jdbc.sqlite.SqliteLocationBuilder
raw.sources.jdbc.snowflake.SnowflakeLocationBuilder
raw.sources.jdbc.pgsql.PostgresqlLocationBuilder
raw.sources.jdbc.mysql.MySqlLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerLocationBuilder
raw.sources.jdbc.oracle.OracleLocationBuilder
raw.sources.jdbc.teradata.TeradataLocationBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ raw.sources.jdbc.sqlite.SqliteSchemaLocationBuilder
raw.sources.jdbc.snowflake.SnowflakeSchemaLocationBuilder
raw.sources.jdbc.pgsql.PostgresqlSchemaLocationBuilder
raw.sources.jdbc.mysql.MySqlSchemaLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder
raw.sources.jdbc.oracle.OracleSchemaLocationBuilder
raw.sources.jdbc.teradata.TeradataSchemaLocationBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ raw.sources.jdbc.sqlite.SqliteTableLocationBuilder
raw.sources.jdbc.snowflake.SnowflakeTableLocationBuilder
raw.sources.jdbc.pgsql.PostgresqlTableLocationBuilder
raw.sources.jdbc.mysql.MySqlTableLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder
raw.sources.jdbc.oracle.OracleTableLocationBuilder
raw.sources.jdbc.teradata.TeradataTableLocationBuilder
175 changes: 175 additions & 0 deletions sources/src/main/scala/raw/sources/jdbc/oracle/OracleClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.oracle

import oracle.net.ns.NetException
import raw.utils.RawSettings
import raw.creds.api.OracleCredential
import raw.sources.jdbc.api._

import java.io.Closeable
import java.net.{ConnectException, SocketTimeoutException, UnknownHostException}
import java.sql.{Connection, DriverManager, SQLException}
import java.sql.Types._
import scala.collection.mutable
import scala.util.matching.Regex
import java.util.Properties
import java.util.concurrent.{Executors, TimeUnit}
import scala.util.control.NonFatal

object OracleClient {
val timestampRegex: Regex = """timestamp\(\d+\)""".r
val interval1Regex: Regex = """interval year\(\d+\) to month""".r
val interval2Regex: Regex = """interval day\(\d+\) to second\(\d+\)""".r
}

class OracleClient(db: OracleCredential)(implicit settings: RawSettings) extends JdbcClient {

import OracleClient._

Class.forName("oracle.jdbc.OracleDriver")

override val vendor: String = "oracle"
override val connectionString: String = {
val port = db.port.map(p => ":" + p).getOrElse(":1521")
s"jdbc:$vendor:thin:@${db.host}$port:${db.database}"
}

override val username: Option[String] = db.username
override val password: Option[String] = db.password

override val hostname: String = db.host
override def database: Option[String] = Some(db.database)

override def getConnection: Connection = {
// For connection pool:
// wrapSQLException(datasource.getConnection())
wrapSQLException {
val props = new Properties()
username.foreach(user => props.setProperty("user", user))
password.foreach(passwd => props.setProperty("password", passwd))

// This property is defined in interface oracle.jdbc.OracleConnection.CONNECTION_PROPERTY_THIN_NET_CONNECT_TIMEOUT
// see https://docs.oracle.com/cd/E18283_01/appdev.112/e13995/oracle/jdbc/OracleConnection.html#CONNECTION_PROPERTY_THIN_READ_TIMEOUT
props.setProperty("oracle.net.CONNECT_TIMEOUT", getConnectTimeout(TimeUnit.MILLISECONDS).toString)
// oracle.jdbc.OracleConnection.CONNECTION_PROPERTY_THIN_READ_TIMEOUT
props.setProperty("oracle.jdbc.ReadTimeout", getReadTimeout(TimeUnit.MILLISECONDS).toString)
val conn = DriverManager.getConnection(connectionString, props)
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), getNetworkTimeout(TimeUnit.MILLISECONDS).toInt)
conn
}
}

override def listTables(schema: String): Iterator[String] with Closeable = {
// Compensate for ORACLE behaviour that requires the schema to be in upper case if it is not quoted.
val sch = if (schema.startsWith("\"")) schema else schema.toUpperCase()
super.listTables(sch)
}

override def tableMetadata(database: Option[String], maybeSchema: Option[String], table: String): TableMetadata = {
val schema = maybeSchema.get
val conn = getConnection
try {
val stmt = wrapSQLException(conn.createStatement())
try {
val rs = wrapSQLException(
stmt.executeQuery(s"""SELECT column_name, data_type, nullable, data_length, data_precision, data_scale
| FROM ALL_TAB_COLUMNS
| WHERE UPPER(table_name) = '${table.toUpperCase}' AND UPPER(owner) = '${schema.toUpperCase}'
| ORDER BY column_id""".stripMargin)
)
val columns = mutable.ListBuffer[TableColumn]()
var nFields = 0
while (wrapSQLException(rs.next())) {
nFields += 1
val nullable = wrapSQLException(rs.getString("nullable")) == "Y"
val columnName = wrapSQLException(rs.getString("column_name"))
val typeName = wrapSQLException(rs.getString("data_type"))
val columnType = typeName.toLowerCase match {
case "char" | "nchar" | "varchar2" | "nvarchar2" => JdbcColumnType(VARCHAR, if (nullable) 1 else 0)
case "float" =>
// float is a sub-type of number but precision defined in bits instead of digits
// so going for decimal, even though might be fair to go for double
JdbcColumnType(DECIMAL, if (nullable) 1 else 0)
case "number" =>
val precision = wrapSQLException(rs.getInt("data_precision"))
val precisionNull = wrapSQLException(rs.wasNull())
val scale = wrapSQLException(rs.getInt("data_scale"))
val t =
if (scale != 0) DECIMAL
else if (precisionNull) INTEGER
else if (precision < 5) SMALLINT
else if (precision < 10) INTEGER
else if (precision < 20) BIGINT
else DECIMAL
JdbcColumnType(t, if (nullable) 1 else 0)
case "date" => JdbcColumnType(DATE, if (nullable) 1 else 0)
case "long" => JdbcColumnType(INTEGER, if (nullable) 1 else 0)
case "binary_float" => JdbcColumnType(REAL, if (nullable) 1 else 0)
case "binary_double" => JdbcColumnType(DOUBLE, if (nullable) 1 else 0)
case timestampRegex() => JdbcColumnType(TIMESTAMP, if (nullable) 1 else 0)
case interval1Regex() | interval2Regex() => NativeIntervalType(nullable)
case "raw" => JdbcColumnType(BLOB, if (nullable) 1 else 0)
case "blob" => JdbcColumnType(BLOB, if (nullable) 1 else 0)
case _ => UnsupportedColumnType
}
columns += TableColumn(columnName, columnType)
}
TableMetadata(columns.to, None)
} finally {
stmt.close()
}
} finally {
conn.close()
}
}

override def wrapSQLException[T](f: => T): T = {
try {
f
} catch {
case ex: SQLException => ex.getCause match {
case inner: NetException => inner.getCause match {
case _: UnknownHostException => throw new RDBMSUnknownHostException(hostname, ex)
case _: SocketTimeoutException => throw new RDBMSConnectTimeoutException(hostname, ex)
case _: ConnectException => throw new RDBMSConnectErrorException(hostname, ex)
}
case int: InterruptedException => throw int
case _ =>
// TODO (ctm): Find documentation of Oracle error codes and check if it is best to map ORA-<errorCode> here.
if (ex.getErrorCode == 1017) {
// ORA-01017: invalid username/password; logon denied
throw new AuthenticationFailedException(ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("28")) {
throw new AuthenticationFailedException(ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("08")) {
throw new RDBMSConnectErrorException(hostname, ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("58")) {
throw new JdbcLocationException(s"database system error: ${ex.getMessage}", ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("0A")) {
throw new JdbcLocationException(s"database feature not supported: ${ex.getMessage}", ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("2E")) {
throw new JdbcLocationException(s"database invalid connection name: ${ex.getMessage}", ex)
} else {
logger.warn(s"Unexpected SQL error (code: ${ex.getErrorCode}; state: ${ex.getSQLState}).", ex)
throw new JdbcLocationException(ex.getMessage, ex)
}
}
case ex: JdbcLocationException => throw ex
case NonFatal(t) =>
logger.warn("Unexpected SQL error.", t)
throw new JdbcLocationException(s"unexpected database error", t)
}
}

}
36 changes: 36 additions & 0 deletions sources/src/main/scala/raw/sources/jdbc/oracle/OracleClients.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.oracle

import raw.client.api.LocationDescription
import raw.creds.api.OracleCredential
import raw.sources.api.{LocationException, SourceContext}

object OracleClients {

def get(dbName: String, location: LocationDescription)(implicit sourceContext: SourceContext): OracleClient = {
val cred: OracleCredential = location.getStringSetting("db-host") match {
case Some(host) =>
val port = location.getIntSetting("db-port")
val userName = location.getStringSetting("db-username")
val password = location.getStringSetting("db-password")
OracleCredential(host, port, dbName, userName, password)
case _ => sourceContext.credentialsService.getRDBMSServer(sourceContext.user, dbName) match {
case Some(cred: OracleCredential) => cred
case _ => throw new LocationException(s"no credential found for oracle: $dbName")
}
}
new OracleClient(cred)(sourceContext.settings)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.oracle

import java.io.Closeable
import raw.sources.jdbc.api._

class OracleLocation(
cli: OracleClient,
dbName: String
) extends JdbcLocation(cli, "oracle", dbName) {

override def rawUri: String = s"oracle:$dbName"

override def listSchemas(): Iterator[String] with Closeable = {
new Iterator[String] with Closeable {
private val it = cli.listSchemas

override def hasNext: Boolean = it.hasNext

override def next(): String = s"oracle:$dbName/${it.next()}"

override def close(): Unit = it.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.oracle

import raw.client.api.LocationDescription
import raw.sources.api.{LocationException, SourceContext}
import raw.sources.jdbc.api.{JdbcLocation, JdbcLocationBuilder}

class OracleLocationBuilder extends JdbcLocationBuilder {

private val oracleDatabaseRegex = """oracle:(?://)?([^/]+)""".r

override def schemes: Seq[String] = Seq("oracle")

override def build(location: LocationDescription)(implicit sourceContext: SourceContext): JdbcLocation = {
location.url match {
case oracleDatabaseRegex(dbName) =>
val db = OracleClients.get(dbName, location)
new OracleLocation(db, db.database.get)
case _ => throw new LocationException("not an oracle database location")
}
}

}
Loading

0 comments on commit 7b1eaa4

Please sign in to comment.