Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions .github/workflows/scala2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ on:

jobs:
build:

strategy:
fail-fast: false
matrix:
scala: ["2.12.12", "2.11.12"]
spark: ["2.4.0", "3.0.0"]
exclude:
- scala: "2.11.12"
spark: "3.0.0"
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Run tests
run: sbt test
- uses: actions/checkout@v1
- uses: olafurpg/setup-scala@v5
- name: Test
run: sbt -Dspark.version=${{ matrix.spark }} ++${{ matrix.scala }} test
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ organization := "com.microsoft.sqlserver.jdbc.spark"

version := "1.0.0"

scalaVersion := "2.12.11"
ThisBuild / useCoursier := false
val sparkVersion = "3.0.0"
crossScalaVersions := Seq("2.12.12", "2.11.12")
scalaVersion := crossScalaVersions.value.head

val sparkVersion = sys.props.get("spark.version").getOrElse("3.0.0")
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")

libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import java.sql.{Connection, ResultSet, ResultSetMetaData, SQLException}

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils.{getColMetaData, getEmptyResultSet, mssqlTruncateTable}
import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.tableExists

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.microsoft.sqlserver.jdbc.spark

import java.sql.{Connection, ResultSet, SQLException}

import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.{SQLContext, DataFrame, SaveMode}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
Expand Down
278 changes: 278 additions & 0 deletions src/main/scala/com/microsoft/sqlserver/jdbc/spark/Logging.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.sqlserver.jdbc.spark

import scala.collection.JavaConverters._

import org.apache.log4j._
import org.apache.log4j.spi.{Filter, LoggingEvent}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

import org.apache.spark.util.Utils

/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*/
trait Logging {

// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null

// Method to get the logger name for this object
protected def logName = {
// Ignore trailing $'s in the class names for Scala objects
this.getClass.getName.stripSuffix("$")
}

// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeLogIfNecessary(false)
log_ = LoggerFactory.getLogger(logName)
}
log_
}

// Log methods that take only a String
protected def logInfo(msg: => String): Unit = {
if (log.isInfoEnabled) log.info(msg)
}

protected def logDebug(msg: => String): Unit = {
if (log.isDebugEnabled) log.debug(msg)
}

protected def logTrace(msg: => String): Unit = {
if (log.isTraceEnabled) log.trace(msg)
}

protected def logWarning(msg: => String): Unit = {
if (log.isWarnEnabled) log.warn(msg)
}

protected def logError(msg: => String): Unit = {
if (log.isErrorEnabled) log.error(msg)
}

// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable): Unit = {
if (log.isInfoEnabled) log.info(msg, throwable)
}

protected def logDebug(msg: => String, throwable: Throwable): Unit = {
if (log.isDebugEnabled) log.debug(msg, throwable)
}

protected def logTrace(msg: => String, throwable: Throwable): Unit = {
if (log.isTraceEnabled) log.trace(msg, throwable)
}

protected def logWarning(msg: => String, throwable: Throwable): Unit = {
if (log.isWarnEnabled) log.warn(msg, throwable)
}

protected def logError(msg: => String, throwable: Throwable): Unit = {
if (log.isErrorEnabled) log.error(msg, throwable)
}

protected def isTraceEnabled(): Boolean = {
log.isTraceEnabled
}

protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
initializeLogIfNecessary(isInterpreter, silent = false)
}

protected def initializeLogIfNecessary(
isInterpreter: Boolean,
silent: Boolean = false
): Boolean = {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging(isInterpreter, silent)
return true
}
}
}
false
}

// For testing
private[spark] def initializeForcefully(
isInterpreter: Boolean,
silent: Boolean
): Unit = {
initializeLogging(isInterpreter, silent)
}

private def initializeLogging(
isInterpreter: Boolean,
silent: Boolean
): Unit = {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
if (Logging.isLog4j12()) {
val log4j12Initialized =
LogManager.getRootLogger.getAllAppenders.hasMoreElements
// scalastyle:off println
if (!log4j12Initialized) {
Logging.defaultSparkLog4jConfig = true
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(getClass.getClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
if (!silent) {
System.err.println(
s"Using Spark's default log4j profile: $defaultLogProps"
)
}
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}

val rootLogger = LogManager.getRootLogger()
if (Logging.defaultRootLevel == null) {
Logging.defaultRootLevel = rootLogger.getLevel()
}

if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
// Update the consoleAppender threshold to replLevel
if (replLevel != rootLogger.getEffectiveLevel()) {
if (!silent) {
System.err.printf(
"Setting default log level to \"%s\".\n",
replLevel
)
System.err.println(
"To adjust logging level use sc.setLogLevel(newLevel). " +
"For SparkR, use setLogLevel(newLevel)."
)
}
Logging.sparkShellThresholdLevel = replLevel
rootLogger.getAllAppenders().asScala.foreach {
case ca: ConsoleAppender =>
ca.addFilter(new SparkShellLoggingFilter())
case _ => // no-op
}
}
}
// scalastyle:on println
}
Logging.initialized = true

// Force a call into slf4j to initialize it. Avoids this happening from multiple threads
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
log
}
}

private[spark] object Logging {
@volatile private var initialized = false
@volatile private var defaultRootLevel: Level = null
@volatile private var defaultSparkLog4jConfig = false
@volatile private[spark] var sparkShellThresholdLevel: Level = null

val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
val bridgeClass =
Class
.forName(
"org.slf4j.bridge.SLF4JBridgeHandler",
true,
Thread.currentThread().getContextClassLoader
)
.asInstanceOf[Class[_]]
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed =
bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
bridgeClass.getMethod("install").invoke(null)
}
} catch {
case e: ClassNotFoundException => // can't log anything yet so just fail silently
}

/**
* Marks the logging system as not initialized. This does a best effort at resetting the
* logging system to its initial state so that the next class to use logging triggers
* initialization again.
*/
def uninitialize(): Unit =
initLock.synchronized {
if (isLog4j12()) {
if (defaultSparkLog4jConfig) {
defaultSparkLog4jConfig = false
LogManager.resetConfiguration()
} else {
val rootLogger = LogManager.getRootLogger()
rootLogger.setLevel(defaultRootLevel)
sparkShellThresholdLevel = null
}
}
this.initialized = false
}

private def isLog4j12(): Boolean = {
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
"org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
}
}

private class SparkShellLoggingFilter extends Filter {

/**
* If sparkShellThresholdLevel is not defined, this filter is a no-op.
* If log level of event is not equal to root level, the event is allowed. Otherwise,
* the decision is made based on whether the log came from root or some custom configuration
* @param loggingEvent
* @return decision for accept/deny log event
*/
def decide(loggingEvent: LoggingEvent): Int = {
if (Logging.sparkShellThresholdLevel == null) {
Filter.NEUTRAL
} else if (
loggingEvent.getLevel.isGreaterOrEqual(Logging.sparkShellThresholdLevel)
) {
Filter.NEUTRAL
} else {
var logger = loggingEvent.getLogger()
while (logger.getParent() != null) {
if (logger.getLevel != null || logger.getAllAppenders.hasMoreElements) {
return Filter.NEUTRAL
}
logger = logger.getParent()
}
Filter.DENY
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.microsoft.sqlserver.jdbc.spark
import java.sql.{ResultSetMetaData, SQLException}

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils.{getDBNameFromURL, savePartition}
import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.{DataFrame, Row}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.microsoft.sqlserver.jdbc.spark
import java.sql.ResultSetMetaData

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils.{savePartition}
import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.{DataFrame, Row}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.microsoft.sqlserver.jdbc.spark

import java.sql.{Connection, ResultSetMetaData, SQLException}

import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.{DataFrame}
import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils._
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.microsoft.sqlserver.jdbc.spark
import java.sql.{Connection, ResultSetMetaData, SQLException}

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils.{executeUpdate, savePartition}
import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.apache.spark.sql.{DataFrame, Row}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.microsoft.sqlserver.jdbc.spark

import java.sql.{Connection, ResultSet, ResultSetMetaData, SQLException}

import org.apache.spark.internal.Logging
import com.microsoft.sqlserver.jdbc.spark.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils._
Expand Down
Loading