From 6c331584aa461524b5ec1f0088e0a69054e09352 Mon Sep 17 00:00:00 2001 From: Jinghui Mo Date: Fri, 31 Mar 2023 10:08:21 -0400 Subject: [PATCH] Improve debug logging --- .../source/accessor/DataSourceAccessor.scala | 5 ++++- .../PathPartitionedTimeSeriesSourceAccessor.scala | 1 + .../feathr/offline/util/FeathrUtils.scala | 15 ++++++++------- .../linkedin/feathr/swj/SlidingWindowJoin.scala | 2 +- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala index f2ff8c800..994df1c43 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala @@ -6,6 +6,7 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.source.{DataSource, SourceFormatType} import com.linkedin.feathr.offline.util.PartitionLimiter import com.linkedin.feathr.offline.util.datetime.DateTimeInterval +import org.apache.logging.log4j.LogManager import org.apache.spark.sql.{DataFrame, SparkSession} import java.io.File @@ -27,6 +28,7 @@ private[offline] abstract class DataSourceAccessor(val source: DataSource) { private[offline] object DataSourceAccessor { + private val log = LogManager.getLogger(getClass) /** * create time series/composite source that contains multiple day/hour data * @@ -49,7 +51,8 @@ private[offline] object DataSourceAccessor { addTimestampColumn: Boolean = false, isStreaming: Boolean = false, dataPathHandlers: List[DataPathHandler]): DataSourceAccessor = { //TODO: Add tests - + val info = s"DataSourceAccessor handling ${source}, with interval ${dateIntervalOpt.getOrElse("None")}" + log.info(info) val dataAccessorHandlers: List[DataAccessorHandler] = dataPathHandlers.map(_.dataAccessorHandler) val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala index 9c205a44f..908f4be2d 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala @@ -150,6 +150,7 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor { val df = fileLoaderFactory.create(path).loadDataFrame() (df, interval) }) + log.info(s"Reading datasets for interval ${timeInterval} from paths: ${pathList.mkString(", ")}") if (dataframes.isEmpty) { val errMsg = s"Input data is empty for creating TimeSeriesSource. No available " + diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index e18d154b3..09c972f13 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -196,19 +196,20 @@ private[feathr] object FeathrUtils { if (isDebugMode(ss)) { val basePath = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_OUTPUT_PATH) val debugFeatureNames = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_FEATURE_NAMES) - .split(FeathrUtils.STRING_PARAMETER_DELIMITER).toSet + .split(FeathrUtils.STRING_PARAMETER_DELIMITER).filter(_.nonEmpty).toSet val outputNumParts = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_OUTPUT_PART_NUM) - val featureNames = "features_" + features.mkString("_") + "_" - if (debugFeatureNames.isEmpty || features.intersect(debugFeatureNames).nonEmpty) { - val savePath = SimplePath(basePath + "/" + featureNames + pathSuffix) - log.info(s"${tag}, Start dumping data ${featureNames} to ${savePath}") + val featureNames = "_for_features_" + features.mkString("_") + "_" + if (features.nonEmpty && (debugFeatureNames.isEmpty || features.intersect(debugFeatureNames).nonEmpty)) { + val savePath = SimplePath(basePath + "/" + tag.replaceAll("\\W", "_") + + featureNames.slice(0, 20) + pathSuffix) + log.info(s"${tag}, Start dumping data ${features.mkString(",")} to ${savePath}") if (!df.isEmpty) { SparkIOUtils.writeDataFrame(df, savePath, Map(FeathrUtils.DEBUG_OUTPUT_PART_NUM -> outputNumParts), List()) } - log.info(s"{tag}. Finish dumping data ${featureNames} to ${savePath}") + log.info(s"{tag}. Finish dumping data ${features.mkString(",")} to ${savePath}") } else { log.info(s"{tag}. Skipping dumping data as feature names to debug are ${debugFeatureNames}, " + - s"and current dataframe has feature ${featureNames}") + s"and current dataframe has feature ${features.mkString(",")}") } } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/swj/SlidingWindowJoin.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/swj/SlidingWindowJoin.scala index 806db1239..78b97a282 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/swj/SlidingWindowJoin.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/swj/SlidingWindowJoin.scala @@ -154,7 +154,7 @@ object SlidingWindowJoin { val pathBaseSuffix = "features_" + featureNames.mkString("_") val leftJoinColumns = Seq(JOIN_KEY_COL_NAME) val leftKeyDf = labelDFPartitioned.select(leftJoinColumns.head, leftJoinColumns.tail: _*) - FeathrUtils.dumpDebugInfo(spark, leftKeyDf, featureNames, "observation data", pathBaseSuffix + "for SWA before join" ) + FeathrUtils.dumpDebugInfo(spark, leftKeyDf, featureNames, "observation data", pathBaseSuffix + "for_SWA_before_join" ) FeathrUtils.dumpDebugInfo(spark, factTransformedDf, featureNames, "SWA feature data", pathBaseSuffix + "_swa_feature") } }