Problème :
Je voudrais faire une jonction spatiale entre :
- Un grand Dataframe Spark (500M rows) avec points (par exemple, des points sur une route)
- un petit geojson (20000 formes) avec polygones (par exemple, les limites des régions).
Voici ce que j'ai jusqu'à présent, qui me semble lent (beaucoup de retard dans l'ordonnancement, peut-être dû au fait que communes n'est pas diffusée) :
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
return joined_df[columns]
La fonction pandas_udf prend en compte un peu de la fonction points (traces) en tant que dataframe pandas, le transforme en GeoDataFrame avec des geopandas, et opère la jointure spatiale avec la fonction polygones GeoDataFrame (bénéficiant ainsi de la jointure Rtree des Geopandas)
Questions :
Existe-t-il un moyen de le rendre plus rapide ? Je comprends que mon communes geodataframe est dans la mémoire du pilote Spark et que chaque travailleur doit le télécharger à chaque appel à l'udf, est-ce correct ?
Cependant, je ne sais pas comment je pourrais rendre ce GeoDataFrame disponible directement aux travailleurs (comme dans une jointure de diffusion).
Des idées ?