1414package com .facebook .presto .orc ;
1515
1616import com .google .common .annotations .VisibleForTesting ;
17- import com .google .common .collect .ImmutableList ;
18- import com .google .common .collect .ImmutableSet ;
1917import com .google .common .primitives .Longs ;
2018import io .airlift .units .DataSize ;
2119import io .airlift .units .DataSize .Unit ;
2220
23- import java .util .HashSet ;
21+ import java .util .ArrayList ;
22+ import java .util .Iterator ;
23+ import java .util .List ;
24+ import java .util .OptionalDouble ;
2425import java .util .OptionalInt ;
2526import java .util .Set ;
2627
2728import static com .google .common .base .Preconditions .checkArgument ;
2829import static com .google .common .base .Preconditions .checkState ;
30+ import static com .google .common .collect .ImmutableList .toImmutableList ;
2931import static java .lang .Math .toIntExact ;
3032import static java .util .Objects .requireNonNull ;
31- import static java .util .stream .Collectors .toSet ;
3233
3334public class DictionaryCompressionOptimizer
3435{
@@ -40,8 +41,8 @@ public class DictionaryCompressionOptimizer
4041
4142 static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize (4 , Unit .MEGABYTE );
4243
43- private final Set <DictionaryColumnManager > allWriters ;
44- private final Set <DictionaryColumnManager > directConversionCandidates = new HashSet <>();
44+ private final List <DictionaryColumnManager > allWriters ;
45+ private final List <DictionaryColumnManager > directConversionCandidates = new ArrayList <>();
4546
4647 private final int stripeMinBytes ;
4748 private final int stripeMaxBytes ;
@@ -59,9 +60,9 @@ public DictionaryCompressionOptimizer(
5960 int dictionaryMemoryMaxBytes )
6061 {
6162 requireNonNull (writers , "writers is null" );
62- this .allWriters = ImmutableSet . copyOf ( writers .stream ()
63+ this .allWriters = writers .stream ()
6364 .map (DictionaryColumnManager ::new )
64- .collect (toSet () ));
65+ .collect (toImmutableList ( ));
6566
6667 checkArgument (stripeMinBytes >= 0 , "stripeMinBytes is negative" );
6768 this .stripeMinBytes = stripeMinBytes ;
@@ -143,52 +144,68 @@ public void optimize(int bufferedBytes, int stripeRowCount)
143144 }
144145 }
145146
147+ BufferedBytesCounter bufferedBytesCounter = new BufferedBytesCounter (bufferedBytes , nonDictionaryBufferedBytes );
148+ optimizeDictionaryColumns (stripeRowCount , bufferedBytesCounter );
149+ }
150+
151+ private void optimizeDictionaryColumns (int stripeRowCount , BufferedBytesCounter bufferedBytesCounter )
152+ {
146153 // convert dictionary columns to direct until we are below the high memory limit
147- while (!directConversionCandidates .isEmpty () && dictionaryMemoryBytes > dictionaryMemoryMaxBytesHigh && bufferedBytes < stripeMaxBytes ) {
148- DictionaryCompressionProjection projection = selectDictionaryColumnToConvert (nonDictionaryBufferedBytes , stripeRowCount );
149- int selectDictionaryColumnBufferedBytes = toIntExact (projection .getColumnToConvert ().getBufferedBytes ());
150-
151- OptionalInt directBytes = tryConvertToDirect (projection .getColumnToConvert (), getMaxDirectBytes (bufferedBytes ));
152- if (directBytes .isPresent ()) {
153- bufferedBytes = bufferedBytes + directBytes .getAsInt () - selectDictionaryColumnBufferedBytes ;
154- nonDictionaryBufferedBytes += directBytes .getAsInt ();
155- }
154+ while (!directConversionCandidates .isEmpty ()
155+ && dictionaryMemoryBytes > dictionaryMemoryMaxBytesHigh
156+ && bufferedBytesCounter .getBufferedBytes () < stripeMaxBytes ) {
157+ convertDictionaryColumn (bufferedBytesCounter , stripeRowCount , OptionalDouble .empty ());
156158 }
157159
158- if (bufferedBytes >= stripeMaxBytes ) {
160+ if (bufferedBytesCounter . getBufferedBytes () >= stripeMaxBytes ) {
159161 return ;
160162 }
161163
162164 // if the stripe is larger then the minimum stripe size, we are not required to convert any more dictionary columns to direct
163- if (bufferedBytes >= stripeMinBytes ) {
165+ if (bufferedBytesCounter . getBufferedBytes () >= stripeMinBytes ) {
164166 // check if we can get better compression by converting a dictionary column to direct. This can happen when then there are multiple
165167 // dictionary columns and one does not compress well, so if we convert it to direct we can continue to use the existing dictionaries
166168 // for the other columns.
167- double currentCompressionRatio = currentCompressionRatio (nonDictionaryBufferedBytes );
168- while (!directConversionCandidates .isEmpty () && bufferedBytes < stripeMaxBytes ) {
169- DictionaryCompressionProjection projection = selectDictionaryColumnToConvert (nonDictionaryBufferedBytes , stripeRowCount );
170- if (projection .getPredictedFileCompressionRatio () < currentCompressionRatio ) {
169+ double currentCompressionRatio = currentCompressionRatio (bufferedBytesCounter .getNonDictionaryBufferedBytes ());
170+ while (!directConversionCandidates .isEmpty () && bufferedBytesCounter .getBufferedBytes () < stripeMaxBytes ) {
171+ if (!convertDictionaryColumn (bufferedBytesCounter , stripeRowCount , OptionalDouble .of (currentCompressionRatio ))) {
171172 return ;
172173 }
173-
174- int selectDictionaryColumnBufferedBytes = toIntExact (projection .getColumnToConvert ().getBufferedBytes ());
175- OptionalInt directBytes = tryConvertToDirect (projection .getColumnToConvert (), getMaxDirectBytes (bufferedBytes ));
176- if (directBytes .isPresent ()) {
177- bufferedBytes = bufferedBytes + directBytes .getAsInt () - selectDictionaryColumnBufferedBytes ;
178- nonDictionaryBufferedBytes += directBytes .getAsInt ();
179- }
180174 }
181175 }
182176 }
183177
178+ private boolean convertDictionaryColumn (BufferedBytesCounter bufferedBytesCounter , int stripeRowCount , OptionalDouble currentCompressionRatio )
179+ {
180+ DictionaryCompressionProjection projection = selectDictionaryColumnToConvert (bufferedBytesCounter .getNonDictionaryBufferedBytes (), stripeRowCount );
181+ int index = projection .getDirectConversionCandidateIndex ();
182+ if (currentCompressionRatio .isPresent () && projection .getPredictedFileCompressionRatio () < currentCompressionRatio .getAsDouble ()) {
183+ return false ;
184+ }
185+
186+ DictionaryColumnManager column = directConversionCandidates .get (index );
187+ int dictionaryBytes = toIntExact (column .getBufferedBytes ());
188+
189+ OptionalInt directBytes = tryConvertToDirect (column , getMaxDirectBytes (bufferedBytesCounter .getBufferedBytes ()));
190+ removeDirectConversionCandidate (index );
191+ if (directBytes .isPresent ()) {
192+ bufferedBytesCounter .incrementBufferedBytes (directBytes .getAsInt () - dictionaryBytes );
193+ bufferedBytesCounter .incrementNonDictionaryBufferedBytes (directBytes .getAsInt ());
194+ }
195+ return true ;
196+ }
197+
184198 @ VisibleForTesting
185199 int convertLowCompressionStreams (int bufferedBytes )
186200 {
187201 // convert all low compression column to direct
188- for (DictionaryColumnManager dictionaryWriter : ImmutableList .copyOf (directConversionCandidates )) {
202+ Iterator <DictionaryColumnManager > iterator = directConversionCandidates .iterator ();
203+ while (iterator .hasNext ()) {
204+ DictionaryColumnManager dictionaryWriter = iterator .next ();
189205 if (dictionaryWriter .getCompressionRatio () < DICTIONARY_MIN_COMPRESSION_RATIO ) {
190206 int columnBufferedBytes = toIntExact (dictionaryWriter .getBufferedBytes ());
191207 OptionalInt directBytes = tryConvertToDirect (dictionaryWriter , getMaxDirectBytes (bufferedBytes ));
208+ iterator .remove ();
192209 if (directBytes .isPresent ()) {
193210 bufferedBytes = bufferedBytes + directBytes .getAsInt () - columnBufferedBytes ;
194211 if (bufferedBytes >= stripeMaxBytes ) {
@@ -206,14 +223,20 @@ private void updateDirectConversionCandidates()
206223 directConversionCandidates .removeIf (DictionaryColumnManager ::isDirectEncoded );
207224 }
208225
226+ private void removeDirectConversionCandidate (int index )
227+ {
228+ DictionaryColumnManager last = directConversionCandidates .get (directConversionCandidates .size () - 1 );
229+ directConversionCandidates .set (index , last );
230+ directConversionCandidates .remove (directConversionCandidates .size () - 1 );
231+ }
232+
209233 private OptionalInt tryConvertToDirect (DictionaryColumnManager dictionaryWriter , int maxDirectBytes )
210234 {
211235 int dictionaryBytes = dictionaryWriter .getDictionaryBytes ();
212236 OptionalInt directBytes = dictionaryWriter .tryConvertToDirect (maxDirectBytes );
213237 if (directBytes .isPresent ()) {
214238 dictionaryMemoryBytes -= dictionaryBytes ;
215239 }
216- directConversionCandidates .remove (dictionaryWriter );
217240 return directBytes ;
218241 }
219242
@@ -274,7 +297,8 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota
274297 long totalUncompressedBytesPerRow = totalNonDictionaryBytesPerRow + totalDictionaryRawBytesPerRow ;
275298
276299 DictionaryCompressionProjection maxProjectedCompression = null ;
277- for (DictionaryColumnManager column : directConversionCandidates ) {
300+ for (int index = 0 ; index < directConversionCandidates .size (); index ++) {
301+ DictionaryColumnManager column = directConversionCandidates .get (index );
278302 // determine the size of the currently written stripe if we were convert this column to direct
279303 long currentRawBytes = totalNonDictionaryBytes + column .getRawBytes ();
280304 long currentDictionaryBytes = totalDictionaryBytes - column .getDictionaryBytes ();
@@ -300,7 +324,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota
300324
301325 // convert the column that creates the best compression ratio
302326 if (maxProjectedCompression == null || maxProjectedCompression .getPredictedFileCompressionRatio () < predictedCompressionRatioAtLimit ) {
303- maxProjectedCompression = new DictionaryCompressionProjection (column , predictedCompressionRatioAtLimit );
327+ maxProjectedCompression = new DictionaryCompressionProjection (index , predictedCompressionRatioAtLimit );
304328 }
305329 }
306330 return maxProjectedCompression ;
@@ -461,23 +485,55 @@ public boolean isDirectEncoded()
461485
462486 private static class DictionaryCompressionProjection
463487 {
464- private final DictionaryColumnManager columnToConvert ;
488+ private final int directConversionIndex ;
465489 private final double predictedFileCompressionRatio ;
466490
467- public DictionaryCompressionProjection (DictionaryColumnManager columnToConvert , double predictedFileCompressionRatio )
491+ public DictionaryCompressionProjection (int directConversionIndex , double predictedFileCompressionRatio )
468492 {
469- this .columnToConvert = requireNonNull ( columnToConvert , "columnToConvert is null" ) ;
493+ this .directConversionIndex = directConversionIndex ;
470494 this .predictedFileCompressionRatio = predictedFileCompressionRatio ;
471495 }
472496
473- public DictionaryColumnManager getColumnToConvert ()
497+ public int getDirectConversionCandidateIndex ()
474498 {
475- return columnToConvert ;
499+ return directConversionIndex ;
476500 }
477501
478502 public double getPredictedFileCompressionRatio ()
479503 {
480504 return predictedFileCompressionRatio ;
481505 }
482506 }
507+
508+ private static class BufferedBytesCounter
509+ {
510+ private int bufferedBytes ;
511+ private int nonDictionaryBufferedBytes ;
512+
513+ public BufferedBytesCounter (int bufferedBytes , int nonDictionaryBufferedBytes )
514+ {
515+ this .bufferedBytes = bufferedBytes ;
516+ this .nonDictionaryBufferedBytes = nonDictionaryBufferedBytes ;
517+ }
518+
519+ public int getBufferedBytes ()
520+ {
521+ return bufferedBytes ;
522+ }
523+
524+ public void incrementBufferedBytes (int value )
525+ {
526+ bufferedBytes += value ;
527+ }
528+
529+ public int getNonDictionaryBufferedBytes ()
530+ {
531+ return nonDictionaryBufferedBytes ;
532+ }
533+
534+ public void incrementNonDictionaryBufferedBytes (int value )
535+ {
536+ nonDictionaryBufferedBytes += value ;
537+ }
538+ }
483539}
0 commit comments