Próbując głębiej zrozumieć, jak działa spark, grałem z interfejsem wiersza polecenia pyspark (2.4.0). Szukałem różnicę między użyciem limit(n).show()
i show(n)
. W końcu dostałem dwa bardzo różne wykonawcze dla dwóch bardzo podobnych wniosków. Poniżej znajdują się polecenia, które wykonałem. Plik parquet, na który powołuje się poniższy kod zawiera około 50 kolumn i ma rozmiar ponad 50 GB w zdalnych HDFS.
# Create dataframe
>>> df = sqlContext.read.parquet('hdfs://hdfs.host/path/to.parquet') ↵
# Create test1 dataframe
>>> test1 = df.select('test_col') ↵
>>> test1.schema ↵
StructType(List(StructField(test_col,ArrayType(LongType,true),true)))
>>> test1.explain() ↵
== Physical Plan ==
*(1) Project [test_col#40]
+- *(1) FileScan parquet [test_col#40]
Batched: false,
Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdfs.host/path/to.parquet],
PartitionCount: 25,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<test_col:array<bigint>>
# Create test2 dataframe
>>> test2 = df.select('test_col').limit(5) ↵
>>> test2.schema ↵
StructType(List(StructField(test_col,ArrayType(LongType,true),true)))
>>> test2.explain() ↵
== Physical Plan ==
CollectLimit 5
+- *(1) Project [test_col#40]
+- *(1) FileScan parquet [test_col#40]
Batched: false,
Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdfs.host/path/to.parquet],
PartitionCount: 25,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<test_col:array<bigint>>
Należy zwrócić uwagę, że plan fizyczny prawie identyczny dla obu test1
i test2
. Jedynym wyjątkiem jest to, że plan test2 zaczyna się od "CollectLimit 5". Po skonfigurowaniu tego pobiegłem test1.show(5)
i test2.show(5)
. Test 1 przywrócił wyniki natychmiast. Test 2 pokazał wskaźnik wykonania zadań 2010 roku i trwał około 20 minut (u mnie był tylko jeden wykonawca).
Pytanie Dlaczego test 2 (z ograniczeniem) pokazał takie złe wyniki w porównaniu z testem 1 (bez ograniczeń)? Zestaw danych i zestaw wyniki były identyczne, a plan fizyczny był prawie identyczny.