From 677bc9f273409ad81584f1cfb35f60f2adda16be Mon Sep 17 00:00:00 2001 From: Miguel Branco Date: Fri, 7 Jun 2024 18:26:34 +0200 Subject: [PATCH] Add Teradata. --- build.sbt | 3 +- project/Dependencies.scala | 1 + sources/src/main/java/module-info.java | 10 +- .../raw.sources.jdbc.api.JdbcLocationBuilder | 1 + ...sources.jdbc.api.JdbcSchemaLocationBuilder | 3 +- ....sources.jdbc.api.JdbcTableLocationBuilder | 3 +- .../jdbc/teradata/TeradataClient.scala | 114 ++++++++++++++++++ .../jdbc/teradata/TeradataClients.scala | 27 +++++ .../jdbc/teradata/TeradataLocation.scala | 38 ++++++ .../teradata/TeradataLocationBuilder.scala | 34 ++++++ .../jdbc/teradata/TeradataSchema.scala | 40 ++++++ .../TeradataSchemaLocationBuilder.scala | 33 +++++ .../sources/jdbc/teradata/TeradataTable.scala | 26 ++++ .../TeradataTableLocationBuilder.scala | 34 ++++++ 14 files changed, 361 insertions(+), 6 deletions(-) create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClient.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClients.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocation.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocationBuilder.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchema.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchemaLocationBuilder.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTable.scala create mode 100644 sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTableLocationBuilder.scala diff --git a/build.sbt b/build.sbt index d5d4becf9..b0ab45a3a 100644 --- a/build.sbt +++ b/build.sbt @@ -101,7 +101,8 @@ lazy val sources = (project in file("sources")) mysqlDeps, mssqlDeps, snowflakeDeps, - oracleDeps + oracleDeps, + teradataDeps ) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 25e44ab39..004fb5e81 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -67,6 +67,7 @@ object Dependencies { val mssqlDeps = "com.microsoft.sqlserver" % "mssql-jdbc" % "7.0.0.jre10" val snowflakeDeps = "net.snowflake" % "snowflake-jdbc" % "3.13.33" val oracleDeps = "com.oracle.database.jdbc" % "ojdbc10" % "19.23.0.0" + val teradataDeps = "com.teradata.jdbc" % "terajdbc" % "20.00.00.24" val icuDeps = "com.ibm.icu" % "icu4j" % "73.2" val poiDeps = Seq( "org.apache.poi" % "poi" % "5.2.3", diff --git a/sources/src/main/java/module-info.java b/sources/src/main/java/module-info.java index dcfaa354f..1bc50fc82 100644 --- a/sources/src/main/java/module-info.java +++ b/sources/src/main/java/module-info.java @@ -79,6 +79,7 @@ exports raw.sources.jdbc.sqlite; exports raw.sources.jdbc.sqlserver; exports raw.sources.jdbc.oracle; + exports raw.sources.jdbc.teradata; opens raw.creds.api to com.fasterxml.jackson.databind; @@ -111,7 +112,8 @@ raw.sources.jdbc.snowflake.SnowflakeLocationBuilder, raw.sources.jdbc.sqlite.SqliteLocationBuilder, raw.sources.jdbc.sqlserver.SqlServerLocationBuilder, - raw.sources.jdbc.oracle.OracleLocationBuilder; + raw.sources.jdbc.oracle.OracleLocationBuilder, + raw.sources.jdbc.teradata.TeradataLocationBuilder; uses raw.sources.jdbc.api.JdbcSchemaLocationBuilder; @@ -121,7 +123,8 @@ raw.sources.jdbc.snowflake.SnowflakeSchemaLocationBuilder, raw.sources.jdbc.sqlite.SqliteSchemaLocationBuilder, raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder, - raw.sources.jdbc.oracle.OracleSchemaLocationBuilder; + raw.sources.jdbc.oracle.OracleSchemaLocationBuilder, + raw.sources.jdbc.teradata.TeradataSchemaLocationBuilder; uses raw.sources.jdbc.api.JdbcTableLocationBuilder; @@ -131,5 +134,6 @@ raw.sources.jdbc.snowflake.SnowflakeTableLocationBuilder, raw.sources.jdbc.sqlite.SqliteTableLocationBuilder, raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder, - raw.sources.jdbc.oracle.OracleTableLocationBuilder; + raw.sources.jdbc.oracle.OracleTableLocationBuilder, + raw.sources.jdbc.teradata.TeradataTableLocationBuilder; } diff --git a/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcLocationBuilder b/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcLocationBuilder index 390f6a7f6..0ffc6a7c6 100644 --- a/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcLocationBuilder +++ b/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcLocationBuilder @@ -4,3 +4,4 @@ raw.sources.jdbc.pgsql.PostgresqlLocationBuilder raw.sources.jdbc.mysql.MySqlLocationBuilder raw.sources.jdbc.sqlserver.SqlServerLocationBuilder raw.sources.jdbc.oracle.OracleLocationBuilder +raw.sources.jdbc.teradata.TeradataLocationBuilder \ No newline at end of file diff --git a/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcSchemaLocationBuilder b/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcSchemaLocationBuilder index cb465bbf9..c815693f2 100644 --- a/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcSchemaLocationBuilder +++ b/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcSchemaLocationBuilder @@ -3,4 +3,5 @@ raw.sources.jdbc.snowflake.SnowflakeSchemaLocationBuilder raw.sources.jdbc.pgsql.PostgresqlSchemaLocationBuilder raw.sources.jdbc.mysql.MySqlSchemaLocationBuilder raw.sources.jdbc.sqlserver.SqlServerSchemaLocationBuilder -raw.sources.jdbc.oracle.OracleSchemaLocationBuilder \ No newline at end of file +raw.sources.jdbc.oracle.OracleSchemaLocationBuilder +raw.sources.jdbc.teradata.TeradataSchemaLocationBuilder \ No newline at end of file diff --git a/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcTableLocationBuilder b/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcTableLocationBuilder index bf7457b11..b9840d37e 100644 --- a/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcTableLocationBuilder +++ b/sources/src/main/resources/META-INF/services/raw.sources.jdbc.api.JdbcTableLocationBuilder @@ -3,4 +3,5 @@ raw.sources.jdbc.snowflake.SnowflakeTableLocationBuilder raw.sources.jdbc.pgsql.PostgresqlTableLocationBuilder raw.sources.jdbc.mysql.MySqlTableLocationBuilder raw.sources.jdbc.sqlserver.SqlServerTableLocationBuilder -raw.sources.jdbc.oracle.OracleTableLocationBuilder \ No newline at end of file +raw.sources.jdbc.oracle.OracleTableLocationBuilder +raw.sources.jdbc.teradata.TeradataTableLocationBuilder \ No newline at end of file diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClient.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClient.scala new file mode 100644 index 000000000..65faf0962 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClient.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.utils.RawSettings +import raw.creds.api.TeradataCredential +import raw.sources.jdbc.api._ + +import java.net.{NoRouteToHostException, SocketTimeoutException, UnknownHostException} +import java.sql.{Connection, DriverManager, ResultSetMetaData, SQLException} +import scala.collection.mutable +import scala.util.control.NonFatal + +class TeradataClient(db: TeradataCredential)(implicit settings: RawSettings) extends JdbcClient { + + Class.forName("com.teradata.jdbc.TeraDriver") + + override val vendor: String = "teradata" + override val connectionString: String = { + // (ctm) Received null parameters while running rawcli tests. + val params: Seq[(String, String)] = Option(db.parameters).getOrElse(Map.empty).toSeq ++ + db.port.map(port => Seq(("DBS_PORT", port.toString))).getOrElse(Seq.empty) + if (params.nonEmpty) { + s"jdbc:$vendor://${db.host}/${params.map(p => s"${p._1}=${p._2}").mkString(",")}" + } else { + s"jdbc:$vendor://${db.host}" + } + } + override val username: Option[String] = db.username + override val password: Option[String] = db.password + + override val hostname: String = db.host + override val database: Option[String] = None + + override def getConnection: Connection = { + wrapSQLException { + // Teradata jdbc connections does not have the setNetworkTimeout + DriverManager.getConnection(connectionString, username.orNull, password.orNull) + } + } + + override def tableMetadata(database: Option[String], maybeSchema: Option[String], table: String): TableMetadata = { + val schema = maybeSchema.get + val conn = getConnection + try { + val query = s"""select top 1 * from "$schema"."$table" ;""" + val stmt = wrapSQLException(conn.prepareStatement(query)) + val meta = wrapSQLException(stmt.getMetaData) + wrapSQLException(stmt.cancel()) + getTableTypeFromResultSetMetadata(meta) + } finally { + conn.close() + } + } + + private def getTableTypeFromResultSetMetadata(res: ResultSetMetaData): TableMetadata = { + val columns = mutable.ListBuffer[TableColumn]() + (1 to wrapSQLException(res.getColumnCount)).foreach { n => + val columnName = wrapSQLException(res.getColumnName(n)) + val columnType = wrapSQLException(res.getColumnType(n)) + val nullability = wrapSQLException(res.isNullable(n)) + columns += TableColumn(columnName, JdbcColumnType(columnType, nullability)) + } + TableMetadata(columns.to, None) + } + + override def wrapSQLException[T](f: => T): T = { + try { + f + } catch { + // TODO (ctm): check Teradata exceptions + case ex: SQLException => ex.getCause match { + case _: UnknownHostException | _: NoRouteToHostException => + // (ctm) In the Python CLI tests, the NoRouteToHostException also happens in the test with bad port + // RuntimeErrorsSourceTeradataTestCase.test_register_bad_port_timeout the host is correct but the port is wrong. + throw new RDBMSUnknownHostException(hostname, ex) + case _: SocketTimeoutException => throw new RDBMSConnectTimeoutException(hostname, ex) + case int: InterruptedException => throw int + case _ => + // Some more codes here (DB2 Universal Messages manual), various databases have varying degrees of compliance + //https://www.ibm.com/support/knowledgecenter/en/SS6NHC/com.ibm.swg.im.dashdb.messages.doc/doc/rdb2stt.html + if (ex.getSQLState != null && ex.getSQLState.startsWith("28")) { + throw new AuthenticationFailedException(ex) + } else if (ex.getSQLState != null && ex.getSQLState.startsWith("08")) { + throw new RDBMSConnectErrorException(hostname, ex) + } else if (ex.getSQLState != null && ex.getSQLState.startsWith("58")) { + throw new JdbcLocationException(s"database system error: ${ex.getMessage}", ex) + } else if (ex.getSQLState != null && ex.getSQLState.startsWith("0A")) { + throw new JdbcLocationException(s"database feature not supported: ${ex.getMessage}", ex) + } else if (ex.getSQLState != null && ex.getSQLState.startsWith("2E")) { + throw new JdbcLocationException(s"database invalid connection name: ${ex.getMessage}", ex) + } else { + logger.warn(s"Unexpected SQL error (code: ${ex.getErrorCode}; state: ${ex.getSQLState}).", ex) + throw new JdbcLocationException(ex.getMessage, ex) + } + } + case ex: JdbcLocationException => throw ex + case NonFatal(t) => + logger.warn("Unexpected SQL error.", t) + throw new JdbcLocationException(s"unexpected database error", t) + } + } + +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClients.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClients.scala new file mode 100644 index 000000000..9509257e7 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataClients.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.creds.api.TeradataCredential +import raw.sources.api.{LocationException, SourceContext} + +object TeradataClients { + + def get(dbName: String)(implicit sourceContext: SourceContext): TeradataClient = { + sourceContext.credentialsService.getRDBMSServer(sourceContext.user, dbName) match { + case Some(cred: TeradataCredential) => new TeradataClient(cred)(sourceContext.settings) + case _ => throw new LocationException(s"no credential found for teradata: $dbName") + } + } + +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocation.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocation.scala new file mode 100644 index 000000000..7251028b5 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocation.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.sources.jdbc.api.JdbcLocation + +import java.io.Closeable + +class TeradataLocation( + cli: TeradataClient, + dbName: String +) extends JdbcLocation(cli, "teradata", dbName) { + + override def rawUri: String = s"teradata:$dbName" + + override def listSchemas(): Iterator[String] with Closeable = { + new Iterator[String] with Closeable { + private val it = cli.listSchemas + + override def hasNext: Boolean = it.hasNext + + override def next(): String = s"teradata:$dbName/${it.next()}" + + override def close(): Unit = it.close() + } + } + +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocationBuilder.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocationBuilder.scala new file mode 100644 index 000000000..b423d753c --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataLocationBuilder.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.client.api.LocationDescription +import raw.sources.api.{LocationException, SourceContext} +import raw.sources.jdbc.api.{JdbcLocation, JdbcLocationBuilder} + +class TeradataLocationBuilder extends JdbcLocationBuilder { + + private val teradataDbRegex = """teradata:(?://)?([^/]+)""".r + + override def schemes: Seq[String] = Seq("teradata") + + override def build(location: LocationDescription)(implicit sourceContext: SourceContext): JdbcLocation = { + location.url match { + case teradataDbRegex(dbName) => + val db = TeradataClients.get(dbName) + new TeradataLocation(db, dbName) + case _ => throw new LocationException("not a teradata database location") + } + } + +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchema.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchema.scala new file mode 100644 index 000000000..5d86483a8 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchema.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import java.io.Closeable +import raw.sources.jdbc.api.JdbcSchemaLocation + +// This might be misleading, this is a Teradata database but works in a similar way to a Oracle schema +// so just remember that like oracle users are also 'databases/schemas' +class TeradataSchema( + cli: TeradataClient, + dbName: String, + schema: String +) extends JdbcSchemaLocation(cli, Some(schema)) { + + override def rawUri: String = s"teradata:$dbName/$schema" + + override def listTables(): Iterator[String] with Closeable = { + new Iterator[String] with Closeable { + private val it = cli.listTables(schema) + + override def hasNext: Boolean = it.hasNext + + override def next(): String = s"teradata:$dbName/$schema/${it.next()}" + + override def close(): Unit = it.close() + } + } + +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchemaLocationBuilder.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchemaLocationBuilder.scala new file mode 100644 index 000000000..7ece8b0b7 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataSchemaLocationBuilder.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.client.api.LocationDescription +import raw.sources.api.{LocationException, SourceContext} +import raw.sources.jdbc.api.{JdbcSchemaLocation, JdbcSchemaLocationBuilder} + +class TeradataSchemaLocationBuilder extends JdbcSchemaLocationBuilder { + + private val schemaRegex = """teradata:(?://)?([^/]+)/([^/]+)""".r + + override def schemes: Seq[String] = Seq("teradata") + + override def build(location: LocationDescription)(implicit sourceContext: SourceContext): JdbcSchemaLocation = { + location.url match { + case schemaRegex(dbName, schema) => + val db = TeradataClients.get(dbName) + new TeradataSchema(db, dbName, schema) + case _ => throw new LocationException("not a teradata schema location") + } + } +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTable.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTable.scala new file mode 100644 index 000000000..b2754b4e6 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTable.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.sources.jdbc.api.JdbcTableLocation + +class TeradataTable( + cli: TeradataClient, + dbName: String, + schema: String, + table: String +) extends JdbcTableLocation(cli, "teradata", dbName, table, Some(schema)) { + + override def rawUri: String = s"teradata:$dbName/$schema/$table" + +} diff --git a/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTableLocationBuilder.scala b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTableLocationBuilder.scala new file mode 100644 index 000000000..3deda23d9 --- /dev/null +++ b/sources/src/main/scala/raw/sources/jdbc/teradata/TeradataTableLocationBuilder.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.jdbc.teradata + +import raw.client.api.LocationDescription +import raw.sources.api.{LocationException, SourceContext} +import raw.sources.jdbc.api.{JdbcTableLocation, JdbcTableLocationBuilder} + +class TeradataTableLocationBuilder extends JdbcTableLocationBuilder { + + private val teradataTableRegex = """teradata:(?://)?([^/]+)/([^/]+)/([^/]+)""".r + + override def schemes: Seq[String] = Seq("teradata") + + override def build(location: LocationDescription)(implicit sourceContext: SourceContext): JdbcTableLocation = { + location.url match { + case teradataTableRegex(dbName, schema, table) => + val db = TeradataClients.get(dbName) + new TeradataTable(db, dbName, schema, table) + case _ => throw new LocationException("not a teradata location") + } + } + +}