Apache Sedona很方便读取geojson、ShapeFile、geopackage等文件,提供了很多spark sql函数和rdd算子。下面例子主要用于熟悉spark和sedona的使用。
引入的maven包
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>GeoJsonToMvt</artifactId><version>0.1</version><packaging>jar</packaging><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.13.8</scala.version><geotrellis.version>3.7.1</geotrellis.version><spark.version>3.3.4</spark.version><spray.json.version>1.3.6</spray.json.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>io.spray</groupId><artifactId>spray-json_2.13</artifactId><version>${spray.json.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.13.4</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.13.4</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.13.4</version></dependency><dependency><groupId>org.locationtech.jts</groupId><artifactId>jts-core</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-main</artifactId><version>25.4</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-epsg-hsql</artifactId><version>25.4</version></dependency><dependency><groupId>org.apache.sedona</groupId><artifactId>sedona-viz-3.0_2.13</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.sedona</groupId><artifactId>sedona-core-3.0_2.13</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.sedona</groupId><artifactId>sedona-sql-3.0_2.13</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.sedona</groupId><artifactId>sedona-spark-3.0_2.13</artifactId> <!-- 替换为你的 Spark 和 Scala 版本 --><version>1.6.1</version> <!-- 替换为你的 Sedona 版本 --></dependency><dependency><groupId>org.locationtech.proj4j</groupId><artifactId>proj4j</artifactId><version>1.1.0</version></dependency></dependencies><repositories><repository><id>osgeo</id><name>OSGeo Release Repository</name><url>https://repo.osgeo.org/repository/release/</url></repository></repositories><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
</project>
下面是java实现的spark代码 部分是由AI(Gemini2.0)生成代码
import org.apache.sedona.spark.SedonaContext;
import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.io.WKTReader;
import scala.Tuple3;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import static java.lang.Math.cos;
import static java.lang.Math.floor;
import static java.lang.Math.pow;
import static java.lang.Math.toRadians;
import static org.apache.spark.sql.functions.*;public class GeoJsonToTilesJava {public static void main(String[] args) throws Exception {System.setProperty("org.geotools.referencing.forceXY", "true");// 创建SparkSessionSparkSession config = SedonaContext.builder().master("local[*]") // Delete this if run in cluster mode.appName("readTestJava") // Change this to a proper name.config("spark.kryo.registrator", SedonaVizKryoRegistrator.class.getName()).config("spark.sql.extensions", "org.apache.sedona.viz.sql.SedonaVizExtensions,org.apache.sedona.sql.SedonaSqlExtensions").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate();SparkSession sedona = SedonaContext.create(config);Dataset<Row> df = sedona.read().format("geojson").option("multiPolygon", "true").load("src/main/resources/guangdong.json").selectExpr("explode(features) as features") // Explode the envelope to get one feature per row..select("features.*") // Unpack the features struct..withColumn("name", expr("properties['name']")).drop("properties").drop("type");Dataset<Row> df1 = df;df1.show();//Dataset<Row> filteredDF = df.filter(expr("ST_Y(ST_Centroid(ST_GeomFromGeoJSON(geometry))) BETWEEN -85.05 AND 85.05"));// 创建坐标转换器try {Dataset<Row> geomDF = df.withColumn("sedona_geom",expr("ST_AsEWKT(ST_Transform(geometry,'epsg:4326','epsg:3857',true))"));// 使用 UDF 创建 Sedona Geometry 列//Dataset<Row> geomDF = df.withColumn("sedona_geom", callUDF("createSedonaGeometry", col("geometry")));geomDF.show();// 修改: getTileIndexUDF 的返回值类型, 并保证如果坐标为空,则返回一个 null structorg.apache.spark.sql.api.java.UDF2<String, Integer, Row> getTileIndexUDF = (centroidStr, zoom)->{if (centroidStr == null || centroidStr.isEmpty()) {return RowFactory.create(null, null, null);} else {String[] str = centroidStr.replace("POINT (", "").replace(")", "").split(" ");List<Double> coordinates = new ArrayList<>();coordinates.add(Double.parseDouble(str[0]));coordinates.add(Double.parseDouble(str[1]));Tuple3<Integer,Integer,Integer> tileIndex = getTileIndex(coordinates,zoom);return RowFactory.create(tileIndex._1(), tileIndex._2(), tileIndex._3());}};//CRS.decode("EPSG:4326", true);// 注册 UDFStructType tileIndexSchema = new StructType(new StructField[]{new StructField("x", DataTypes.IntegerType, true, Metadata.empty()),new StructField("y", DataTypes.IntegerType, true, Metadata.empty()),new StructField("z", DataTypes.IntegerType, true, Metadata.empty())});sedona.udf().register("getTileIndexUDF", getTileIndexUDF, tileIndexSchema);org.apache.spark.sql.api.java.UDF1<String,String> getCentroid = (geomStr)->{if (geomStr == null || geomStr.isEmpty()) {return null;} else {GeometryFactory factory = new GeometryFactory();WKTReader wktReader = new WKTReader(factory);org.locationtech.jts.geom.Geometry geom = wktReader.read(geomStr);return geom.getCentroid().toString();}};// 注册 UDFsedona.udf().register("getCentroid", getCentroid, DataTypes.StringType);List<Integer> zooms = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);// 关键修改:直接在原始的 geomDF 上面进行循环操作for (Integer zoom : zooms){Dataset<Row> tilesDF = geomDF.withColumn("centroid",callUDF("getCentroid", col("sedona_geom"))).withColumn("tile", callUDF("getTileIndexUDF", col("centroid"), lit(zoom)));tilesDF.show();Dataset<Row> groupedTiles = tilesDF.filter(col("tile").isNotNull()).groupBy(col("tile")).agg(collect_list("sedona_geom").alias("features"));groupedTiles.write().format("json").mode("overwrite").save("D:/temp/output/zoom_" + zoom);}sedona.stop();}catch (Exception e){e.printStackTrace();}}private static Tuple3<Integer, Integer, Integer> getTileIndex(List<Double> coordinates, int zoom) {if (coordinates == null || coordinates.isEmpty()) {return null;} else {double x = coordinates.get(0);double y = coordinates.get(1);double res = 156543.03392 * cos(toRadians(0)) / pow(2, zoom);int tileX = (int) floor((x + 20037508.34) / (res * 256));int tileY = (int) floor((20037508.34 - y) / (res * 256));return new Tuple3<>(tileX,tileY,zoom);}}
}
参考链接
https://sedona.apache.org/1.7.0/tutorial/sql/
https://sedona.apache.org/1.7.0/api/sql/Function/
谷歌双子座AIhttps://aistudio.google.com/