6 votes

Jointure spatiale entre les dataframes pyspark et les polygones (geopandas)

Problème :

Je voudrais faire une jonction spatiale entre :

  • Un grand Dataframe Spark (500M rows) avec points (par exemple, des points sur une route)
  • un petit geojson (20000 formes) avec polygones (par exemple, les limites des régions).

Voici ce que j'ai jusqu'à présent, qui me semble lent (beaucoup de retard dans l'ordonnancement, peut-être dû au fait que communes n'est pas diffusée) :

@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):   
    geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
    gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
    joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
    return joined_df[columns]

La fonction pandas_udf prend en compte un peu de la fonction points (traces) en tant que dataframe pandas, le transforme en GeoDataFrame avec des geopandas, et opère la jointure spatiale avec la fonction polygones GeoDataFrame (bénéficiant ainsi de la jointure Rtree des Geopandas)

Questions :

Existe-t-il un moyen de le rendre plus rapide ? Je comprends que mon communes geodataframe est dans la mémoire du pilote Spark et que chaque travailleur doit le télécharger à chaque appel à l'udf, est-ce correct ?

Cependant, je ne sais pas comment je pourrais rendre ce GeoDataFrame disponible directement aux travailleurs (comme dans une jointure de diffusion).

Des idées ?

1voto

Luis Blanche Points 352

Un an après, voici ce que j'ai fini par faire en tant que @ndricca suggéré, l'astuce consiste à diffuser les communes, mais on ne peut pas diffuser une GeoDataFrame Vous devez donc le charger en tant que DataFrame Spark, puis le convertir en JSON avant de le diffuser. Ensuite, vous reconstruisez le GeoDataFrame à l'intérieur de l'UDF en utilisant shapely.wkt (Well Known Text : une façon d'encoder des objets géométriques en tant que texte)

Une autre astuce consiste à utiliser un sel dans le groupby pour garantir une répartition égale des données sur le cluster.

import geopandas as gpd
from shapely import wkt
from pyspark.sql.functions import broadcast
communes = gpd.load_file('...communes.geojson')
# Use a previously created spark session
traces= spark_session.read_csv('trajectoires.csv')
communes_spark = spark.createDataFrame(communes[['insee_comm', 'wkt']])
communes_json = provinces_spark.toJSON().collect()
communes_bc = spark.sparkContext.broadcast(communes_json)

@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes_bc(traces):
    communes = pd.DataFrame.from_records([json.loads(c) for c in communes_bc.value])
    polygons = [wkt.loads(w) for w in communes['wkt']]
    gdf_communes = gpd.GeoDataFrame(communes, geometry=polygons, crs=crs )
    geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
    gdf_traces = gpd.GeoDataFrame(traces , geometry=geometry, crs=crs)
    joined_df = gpd.sjoin(gdf_traces, gdf_communes, how='left', op='within')
    return joined_df[columns]

traces = traces.groupby(salt).apply(join_communes_bc)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X