Skip to content

Commit

Permalink
Add Teradata.
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelbranco80 committed Jun 7, 2024
1 parent 146daa5 commit 677bc9f
Show file tree
Hide file tree
Showing 14 changed files with 361 additions and 6 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ lazy val sources = (project in file("sources"))
mysqlDeps,
mssqlDeps,
snowflakeDeps,
oracleDeps
oracleDeps,
teradataDeps
)
)

Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ object Dependencies {
val mssqlDeps = "com.microsoft.sqlserver" % "mssql-jdbc" % "7.0.0.jre10"
val snowflakeDeps = "net.snowflake" % "snowflake-jdbc" % "3.13.33"
val oracleDeps = "com.oracle.database.jdbc" % "ojdbc10" % "19.23.0.0"
val teradataDeps = "com.teradata.jdbc" % "terajdbc" % "20.00.00.24"
val icuDeps = "com.ibm.icu" % "icu4j" % "73.2"
val poiDeps = Seq(
"org.apache.poi" % "poi" % "5.2.3",
Expand Down
10 changes: 7 additions & 3 deletions sources/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
exports raw.sources.jdbc.sqlite;
exports raw.sources.jdbc.sqlserver;
exports raw.sources.jdbc.oracle;
exports raw.sources.jdbc.teradata;

opens raw.creds.api to
com.fasterxml.jackson.databind;
Expand Down Expand Up @@ -111,7 +112,8 @@
raw.sources.jdbc.snowflake.SnowflakeLocationBuilder,
raw.sources.jdbc.sqlite.SqliteLocationBuilder,
raw.sources.jdbc.sqlserver.SqlServerLocationBuilder,
raw.sources.jdbc.oracle.OracleLocationBuilder;
raw.sources.jdbc.oracle.OracleLocationBuilder,
raw.sources.jdbc.teradata.TeradataLocationBuilder;

uses raw.sources.jdbc.api.JdbcSchemaLocationBuilder;

Expand All @@ -121,7 +123,8 @@
raw.sources.jdbc.snowflake.SnowflakeSchemaLocationBuilder,
raw.sources.jdbc.sqlite.SqliteSchemaLocationBuilder,
raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder,
raw.sources.jdbc.oracle.OracleSchemaLocationBuilder;
raw.sources.jdbc.oracle.OracleSchemaLocationBuilder,
raw.sources.jdbc.teradata.TeradataSchemaLocationBuilder;

uses raw.sources.jdbc.api.JdbcTableLocationBuilder;

Expand All @@ -131,5 +134,6 @@
raw.sources.jdbc.snowflake.SnowflakeTableLocationBuilder,
raw.sources.jdbc.sqlite.SqliteTableLocationBuilder,
raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder,
raw.sources.jdbc.oracle.OracleTableLocationBuilder;
raw.sources.jdbc.oracle.OracleTableLocationBuilder,
raw.sources.jdbc.teradata.TeradataTableLocationBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ raw.sources.jdbc.pgsql.PostgresqlLocationBuilder
raw.sources.jdbc.mysql.MySqlLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerLocationBuilder
raw.sources.jdbc.oracle.OracleLocationBuilder
raw.sources.jdbc.teradata.TeradataLocationBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ raw.sources.jdbc.snowflake.SnowflakeSchemaLocationBuilder
raw.sources.jdbc.pgsql.PostgresqlSchemaLocationBuilder
raw.sources.jdbc.mysql.MySqlSchemaLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder
raw.sources.jdbc.oracle.OracleSchemaLocationBuilder
raw.sources.jdbc.oracle.OracleSchemaLocationBuilder
raw.sources.jdbc.teradata.TeradataSchemaLocationBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ raw.sources.jdbc.snowflake.SnowflakeTableLocationBuilder
raw.sources.jdbc.pgsql.PostgresqlTableLocationBuilder
raw.sources.jdbc.mysql.MySqlTableLocationBuilder
raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder
raw.sources.jdbc.oracle.OracleTableLocationBuilder
raw.sources.jdbc.oracle.OracleTableLocationBuilder
raw.sources.jdbc.teradata.TeradataTableLocationBuilder
114 changes: 114 additions & 0 deletions sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import raw.utils.RawSettings
import raw.creds.api.TeradataCredential
import raw.sources.jdbc.api._

import java.net.{NoRouteToHostException, SocketTimeoutException, UnknownHostException}
import java.sql.{Connection, DriverManager, ResultSetMetaData, SQLException}
import scala.collection.mutable
import scala.util.control.NonFatal

class TeradataClient(db: TeradataCredential)(implicit settings: RawSettings) extends JdbcClient {

Class.forName("com.teradata.jdbc.TeraDriver")

override val vendor: String = "teradata"
override val connectionString: String = {
// (ctm) Received null parameters while running rawcli tests.
val params: Seq[(String, String)] = Option(db.parameters).getOrElse(Map.empty).toSeq ++
db.port.map(port => Seq(("DBS_PORT", port.toString))).getOrElse(Seq.empty)
if (params.nonEmpty) {
s"jdbc:$vendor://${db.host}/${params.map(p => s"${p._1}=${p._2}").mkString(",")}"
} else {
s"jdbc:$vendor://${db.host}"
}
}
override val username: Option[String] = db.username
override val password: Option[String] = db.password

override val hostname: String = db.host
override val database: Option[String] = None

override def getConnection: Connection = {
wrapSQLException {
// Teradata jdbc connections does not have the setNetworkTimeout
DriverManager.getConnection(connectionString, username.orNull, password.orNull)
}
}

override def tableMetadata(database: Option[String], maybeSchema: Option[String], table: String): TableMetadata = {
val schema = maybeSchema.get
val conn = getConnection
try {
val query = s"""select top 1 * from "$schema"."$table" ;"""
val stmt = wrapSQLException(conn.prepareStatement(query))
val meta = wrapSQLException(stmt.getMetaData)
wrapSQLException(stmt.cancel())
getTableTypeFromResultSetMetadata(meta)
} finally {
conn.close()
}
}

private def getTableTypeFromResultSetMetadata(res: ResultSetMetaData): TableMetadata = {
val columns = mutable.ListBuffer[TableColumn]()
(1 to wrapSQLException(res.getColumnCount)).foreach { n =>
val columnName = wrapSQLException(res.getColumnName(n))
val columnType = wrapSQLException(res.getColumnType(n))
val nullability = wrapSQLException(res.isNullable(n))
columns += TableColumn(columnName, JdbcColumnType(columnType, nullability))
}
TableMetadata(columns.to, None)
}

override def wrapSQLException[T](f: => T): T = {
try {
f
} catch {
// TODO (ctm): check Teradata exceptions
case ex: SQLException => ex.getCause match {
case _: UnknownHostException | _: NoRouteToHostException =>
// (ctm) In the Python CLI tests, the NoRouteToHostException also happens in the test with bad port
// RuntimeErrorsSourceTeradataTestCase.test_register_bad_port_timeout the host is correct but the port is wrong.
throw new RDBMSUnknownHostException(hostname, ex)
case _: SocketTimeoutException => throw new RDBMSConnectTimeoutException(hostname, ex)
case int: InterruptedException => throw int
case _ =>
// Some more codes here (DB2 Universal Messages manual), various databases have varying degrees of compliance
//https://www.ibm.com/support/knowledgecenter/en/SS6NHC/com.ibm.swg.im.dashdb.messages.doc/doc/rdb2stt.html
if (ex.getSQLState != null && ex.getSQLState.startsWith("28")) {
throw new AuthenticationFailedException(ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("08")) {
throw new RDBMSConnectErrorException(hostname, ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("58")) {
throw new JdbcLocationException(s"database system error: ${ex.getMessage}", ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("0A")) {
throw new JdbcLocationException(s"database feature not supported: ${ex.getMessage}", ex)
} else if (ex.getSQLState != null && ex.getSQLState.startsWith("2E")) {
throw new JdbcLocationException(s"database invalid connection name: ${ex.getMessage}", ex)
} else {
logger.warn(s"Unexpected SQL error (code: ${ex.getErrorCode}; state: ${ex.getSQLState}).", ex)
throw new JdbcLocationException(ex.getMessage, ex)
}
}
case ex: JdbcLocationException => throw ex
case NonFatal(t) =>
logger.warn("Unexpected SQL error.", t)
throw new JdbcLocationException(s"unexpected database error", t)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import raw.creds.api.TeradataCredential
import raw.sources.api.{LocationException, SourceContext}

object TeradataClients {

def get(dbName: String)(implicit sourceContext: SourceContext): TeradataClient = {
sourceContext.credentialsService.getRDBMSServer(sourceContext.user, dbName) match {
case Some(cred: TeradataCredential) => new TeradataClient(cred)(sourceContext.settings)
case _ => throw new LocationException(s"no credential found for teradata: $dbName")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import raw.sources.jdbc.api.JdbcLocation

import java.io.Closeable

class TeradataLocation(
cli: TeradataClient,
dbName: String
) extends JdbcLocation(cli, "teradata", dbName) {

override def rawUri: String = s"teradata:$dbName"

override def listSchemas(): Iterator[String] with Closeable = {
new Iterator[String] with Closeable {
private val it = cli.listSchemas

override def hasNext: Boolean = it.hasNext

override def next(): String = s"teradata:$dbName/${it.next()}"

override def close(): Unit = it.close()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import raw.client.api.LocationDescription
import raw.sources.api.{LocationException, SourceContext}
import raw.sources.jdbc.api.{JdbcLocation, JdbcLocationBuilder}

class TeradataLocationBuilder extends JdbcLocationBuilder {

private val teradataDbRegex = """teradata:(?://)?([^/]+)""".r

override def schemes: Seq[String] = Seq("teradata")

override def build(location: LocationDescription)(implicit sourceContext: SourceContext): JdbcLocation = {
location.url match {
case teradataDbRegex(dbName) =>
val db = TeradataClients.get(dbName)
new TeradataLocation(db, dbName)
case _ => throw new LocationException("not a teradata database location")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import java.io.Closeable
import raw.sources.jdbc.api.JdbcSchemaLocation

// This might be misleading, this is a Teradata database but works in a similar way to a Oracle schema
// so just remember that like oracle users are also 'databases/schemas'
class TeradataSchema(
cli: TeradataClient,
dbName: String,
schema: String
) extends JdbcSchemaLocation(cli, Some(schema)) {

override def rawUri: String = s"teradata:$dbName/$schema"

override def listTables(): Iterator[String] with Closeable = {
new Iterator[String] with Closeable {
private val it = cli.listTables(schema)

override def hasNext: Boolean = it.hasNext

override def next(): String = s"teradata:$dbName/$schema/${it.next()}"

override def close(): Unit = it.close()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import raw.client.api.LocationDescription
import raw.sources.api.{LocationException, SourceContext}
import raw.sources.jdbc.api.{JdbcSchemaLocation, JdbcSchemaLocationBuilder}

class TeradataSchemaLocationBuilder extends JdbcSchemaLocationBuilder {

private val schemaRegex = """teradata:(?://)?([^/]+)/([^/]+)""".r

override def schemes: Seq[String] = Seq("teradata")

override def build(location: LocationDescription)(implicit sourceContext: SourceContext): JdbcSchemaLocation = {
location.url match {
case schemaRegex(dbName, schema) =>
val db = TeradataClients.get(dbName)
new TeradataSchema(db, dbName, schema)
case _ => throw new LocationException("not a teradata schema location")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.sources.jdbc.teradata

import raw.sources.jdbc.api.JdbcTableLocation

class TeradataTable(
cli: TeradataClient,
dbName: String,
schema: String,
table: String
) extends JdbcTableLocation(cli, "teradata", dbName, table, Some(schema)) {

override def rawUri: String = s"teradata:$dbName/$schema/$table"

}
Loading

0 comments on commit 677bc9f

Please sign in to comment.