Skip to content

Commit b1fe760

Browse files
DEV: Update FeatureSource dataframe conversion (#237)
REFACTOR: Remove conversion of whole RDD to DataFrame FEAT: Add function for slicing rows and columns and converting to DF
1 parent 4df6e32 commit b1fe760

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

src/main/scala/au/csiro/variantspark/input/VCFFeatureSource.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,19 @@ class VCFFeatureSource(vcfSource: VCFSource, converter: VariantToFeatureConverte
6060
vcfSource.genotypes().map(converterRef.convert)
6161
}
6262

63-
lazy val sampleNamesStructArr: Array[StructField] =
64-
sampleNames.map(StructField(_, ByteType, true)).toArray
65-
66-
lazy val featureDFSchema: StructType =
67-
StructType(Seq(StructField("variant_id", StringType, true)) ++ sampleNamesStructArr)
68-
69-
def toDF(sqlContext: SQLContext): DataFrame = {
63+
def head(sqlContext: SQLContext, rowLim: Int = 10, colLim: Int = 10): DataFrame = {
64+
lazy val sampleNamesStructArr: Array[StructField] =
65+
sampleNames.take(colLim).map(StructField(_, ByteType, true)).toArray
66+
lazy val featureDFSchema: StructType =
67+
StructType(Seq(StructField("variant_id", StringType, true)) ++ sampleNamesStructArr)
7068
val sc = sqlContext.sparkContext
7169

72-
val featureRDD: RDD[Row] =
73-
features.mapPartitions { it =>
74-
it.map { f => Row.fromSeq(f.label +: f.valueAsByteArray.toSeq) }
70+
val slicedFeatureArray: Array[Row] =
71+
features.take(rowLim).map { f =>
72+
Row.fromSeq(f.label +: f.valueAsByteArray.take(colLim).toSeq)
7573
}
76-
sqlContext.createDataFrame(featureRDD, featureDFSchema)
74+
val slicedFeatureRDD: RDD[Row] = sc.parallelize(slicedFeatureArray)
75+
sqlContext.createDataFrame(slicedFeatureRDD, featureDFSchema)
7776
}
7877

7978
}

0 commit comments

Comments
 (0)