Edit: Progressing into this problem analysis, the management of the dataset nested values with tools that Spark offers since 3.1.x seems required.
→ It's my manner of filling nested objects hold by objects carried by a Dataset that is currently wrong.
I have a clumsy workaround, and there's a theoretically solution through How to transform rows with Spark to a row with a map Column leading me to something starting like this:
Dataset<Row> datasetRowPerimetres = [...]
return groupBy(datasetRowPerimetres)
.agg(collect_list("nomCommuneMembre").alias("nomCommuneMembre"))
.withColumn("mG", map(new Column("sirenGroupement"), new Column("nomCommuneMembre")))
.drop("nomCommuneMembre");
that shows me a map mG like that:
| sirenGroupement | mG |
|---|---|
| 200000198 | {200000198 -> [Beaumont, Ceyrat, Saint-Genès-Champanelle]} |
- the map key should be replaced by another one
- and cities names by their cities objects.
Code would become clumsy, with drawbacks. I'm abandoning this and return to separate Datasets having objects without array, set or map. At the cost their users will join them by their keys if needed.
I often have the problem to dispatch a list or a set of children objects C into their P parent, and return a list of P objects where each P is filled with its C objects.
But it's harder to return Dataset<P> having these ones.
I've tried to solve this problem through a custom user aggregation function (Spark 3.5.6).
I'm starting from an open data source CSV, that contains records for French local authorities completed by the cities that are their members:
L1, C1
L1, C2
L1, C3
L2, C4
L2, C5
[...]
From the CSV file, I've extracted primitives fields and composed few business objects. Each record depicts what is called a Perimeter: a working/DTO object that is a the description of a member of a local authority.
In example, Perimeter objects are for the "Communauté Intercommunale des Villes Solidaires (CA)" local authority, that has six cities, these ones:
| categorieMembresGroupement | communeMembreGroupement | competencesExercees | densiteDemographiqueHabitantsAuKilometreCarre | etablissementPublicTerritorialDeBassin | fiscaliteFinancement | fiscalitePropre | gestionDesEaux | interdepartemental | membreAdherentSyndicatMixte | natureJuridique | nomGroupement | nombreDeCommunesMmebres | organigramme | population | redevanceEnlevementOrduresMenageres | siege | siren | taxeEnlevementOrduresMenageres | zoneDeMontagne |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| commune | {97401, {974}, {04}, Les Avirons, 218, 11445, 11663, 219740016} | {NULL, {C1020 -> {C1020, false, false, NULL}, C1505 -> {C1505, false, false, NULL}, [...] | 491.7 | NULL | {NULL, 4108814, NULL, 9325112, 5216298, FPU, NULL, NULL, NULL, NULL, NULL, NULL} | true | NULL | false | {Ile De La Réunion Mobilités, 891190, 200045276} | COMMUNAUTE_AGGLOMERATION | CA CIVIS (Communauté Intercommunale des Villes Solidaires) | 6 | {M., RDC, , LORION, 70, David, NULL} | 185848 | false | {{ , NULL, NULL, NULL, 97410,[...] [...] | 249740077 | true | false |
| commune | {97404, {974}, {04}, L'Étang-Salé, 211, 14329, 14540, 219740040} | {NULL, {C1020 -> {C1020, false, false, NULL}, C1505 -> {C1505, false, false, NULL}, [...] | 491.7 | NULL | {NULL, 4108814, NULL, 9325112, 5216298, FPU, NULL, NULL, NULL, NULL, NULL, NULL} | true | NULL | false | {SM de Pierrefonds, 356662, 259741007} | COMMUNAUTE_AGGLOMERATION | CA CIVIS (Communauté Intercommunale des Villes Solidaires) | 6 | {M., RDC, , LORION, 70, David, NULL} | 185848 | false | {{ , NULL, NULL, NULL, 97410,[...] [...] | 249740077 | true | false |
| commune | {97405, {974}, {04}, Petite-Île, 171, 12920, 13091, 219740057} | {NULL, {C1020 -> {C1020, false, false, NULL}, C1505 -> {C1505, false, false, NULL}, [...] | 491.7 | NULL | {NULL, 4108814, NULL, 9325112, 5216298, FPU, NULL, NULL, NULL, NULL, NULL, NULL} | true | NULL | false | {SAEP des Hirondelles, 320612, 200101079} | COMMUNAUTE_AGGLOMERATION | CA CIVIS (Communauté Intercommunale des Villes Solidaires) | 6 | {M., RDC, , LORION, 70, David, NULL} | 185848 | false | {{ , NULL, NULL, NULL, 97410,[...] [...] | 249740077 | true | false |
| commune | {97414, {974}, {04}, Saint-Louis, 676, 54478, 55154, 219740149} | {NULL, {C1020 -> {C1020, false, false, NULL}, C1505 -> {C1505, false, false, NULL}, [...] | 491.7 | NULL | {NULL, 4108814, NULL, 9325112, 5216298, FPU, NULL, NULL, NULL, NULL, NULL, NULL} | true | NULL | false | {SM d'études et de programmation du SCOT du Grand Sud, 320612, 259741080} | COMMUNAUTE_AGGLOMERATION | CA CIVIS (Communauté Intercommunale des Villes Solidaires) | 6 | {M., RDC, , LORION, 70, David, NULL} | 185848 | false | {{ , NULL, NULL, NULL, 97410,[...] [...] | 249740077 | true | false |
| commune | {97416, {974}, {04}, Saint-Pierre, 886, 85254, 86140, 219740164} | {NULL, {C1020 -> {C1020, false, false, NULL}, C1505 -> {C1505, false, false, NULL}, [...] | 491.7 | NULL | {NULL, 4108814, NULL, 9325112, 5216298, FPU, NULL, NULL, NULL, NULL, NULL, NULL} | true | NULL | false | {SM de traitement des déchets des microrégions Sud et Ouest de La Réunion dénommé ILEVA, 542144, 200045342} | COMMUNAUTE_AGGLOMERATION | CA CIVIS (Communauté Intercommunale des Villes Solidaires) | 6 | {M., RDC, , LORION, 70, David, NULL} | 185848 | false | {{ , NULL, NULL, NULL, 97410,[...] [...] | 249740077 | true | false |
| commune | {97424, {974}, {04}, Cilaos, 45, 5215, 5260, 219740248} | {NULL, {C1020 -> {C1020, false, false, NULL}, C1505 -> {C1505, false, false, NULL}, [...] | 491.7 | NULL | {NULL, 4108814, NULL, 9325112, 5216298, FPU, NULL, NULL, NULL, NULL, NULL, NULL} | true | NULL | false | { , NULL, } | COMMUNAUTE_AGGLOMERATION | CA CIVIS (Communauté Intercommunale des Villes Solidaires) | 6 | {M., RDC, , LORION, 70, David, NULL} | 185848 | false | {{ , NULL, NULL, NULL, 97410,[...] [...] | 249740077 | true | false |
communeMembreGroupement and membreAdherentSyndicatMixte being the business objects varying for each Perimeter owned by the same local authority.
And here, I need to start creating objects of Groupement class, depicting these local authorities, each Groupement having for variable member an object MembresGroupement listing its member cities.
A Groupement has for structure:
root
|-- competencesExercees: struct
| |-- competenceConservee: boolean
| |-- competences: map
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- code: string
| | | |-- deleguee: boolean
| | | |-- interetCommunautaire: boolean
| | | |-- obligatoire: boolean
| |-- nombreDeCompetencesExercees: integer
| |-- syndicatALaCarte: boolean
|-- densiteDemographiqueHabitantsAuKilometreCarre: double
|-- etablissementPublicTerritorialDeBassin: boolean
|-- fiscaliteFinancement: struct
| |-- dgfParHabitant: double
| |-- dotationDeCompensation: integer
| |-- dotationDesGroupementsTouristiques: integer
| |-- dotationGlobale: integer
| |-- dotationIntercommunalite: integer
| |-- modeDeFinancement: string
| |-- obsoleteAutreRedevance: integer
| |-- obsoleteAutreTaxe: integer
| |-- obsoleteDGFBonifiee: integer
| |-- obsoleteDSC: integer
| |-- populationDGF: integer
| |-- potentielFiscal: integer
|-- fiscalitePropre: boolean (nullable = false)
|-- gestionDesEaux: boolean
|-- interdepartemental: boolean
|-- membresGroupement: map
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- categorieMembresGroupement: string
| | |-- communeMembreGroupement: struct
| | | |-- codeCommune: string
| | | |-- codeDepartementCommune: struct
| | | | |-- id: string
| | | |-- codeRegionCommune: struct
| | | | |-- id: string
| | | |-- nomCommune: string
| | | |-- populationCommuneCompteAPart: integer
| | | |-- populationCommuneMunicipale: integer
| | | |-- populationCommuneTotale: integer
| | | |-- sirenCommune: string
| | |-- membreAdherentSyndicatMixte: struct
| | | |-- nomAdherent: string
| | | |-- populationAdherente: integer
| | | |-- sirenAdherent: string
| | |-- sirenGroupement: string
|-- natureJuridique: string
|-- nomGroupement: string
|-- nombreDeCommunesMmebres: integer
|-- organigramme: struct
| |-- civilite: string
| |-- modeDeRepartitionDesSieges: string
| |-- modeDeRepartitionDesSiegesAutre: string
| |-- nomPresident: string
| |-- nombreDeDelegues: integer
| |-- prenomPresident: string
| |-- representationSubstitution: string
|-- population: integer
|-- redevanceEnlevementOrduresMenageres: boolean
|-- siege: struct
| |-- adresseAdministrative: struct
| | |-- bureauDistributeur: string
| | |-- cedex: string
| | |-- codeCommune: struct
| | | |-- id: string
| | |-- codePaysEtranger: string
| | |-- codePostal: string
| | |-- commentaire: string
| | |-- complementNom: string
| | |-- complementVoie: string
| | |-- libelleCedex: string
| | |-- nom: string
| | |-- nomPaysEtranger: string
| | |-- numeroVoie: string
| | |-- repetitionNumeroDansVoie: string
| | |-- typeVoie: string
| | |-- ville: string
| | |-- voie: string
| |-- adresseSiege: struct
| | |-- bureauDistributeur: string
| | |-- cedex: string
| | |-- codeCommune: struct
| | | |-- id: string
| | |-- codePaysEtranger: string
| | |-- codePostal: string
| | |-- commentaire: string
| | |-- complementNom: string
| | |-- complementVoie: string
| | |-- libelleCedex: string
| | |-- nom: string
| | |-- nomPaysEtranger: string
| | |-- numeroVoie: string
| | |-- repetitionNumeroDansVoie: string
| | |-- typeVoie: string
| | |-- ville: string
| | |-- voie: string
| |-- codeCommune: string
| |-- codeDepartementCommune: struct
| | |-- id: string
| |-- codeRegionCommune: struct
| | |-- id: string
| |-- communeSiege: struct
| | |-- arrondissement: string
| | |-- codeCommune: string
| | |-- codeDepartementCommune: struct
| | | |-- id: string
| | |-- codeRegionCommune: struct
| | | |-- id: string
| | |-- commune: struct
| | | |-- arrondissement: string
| | | |-- codeCanton: string
| | | |-- codeCommune: string
| | | |-- codeCommuneParente: string
| | | |-- codeDepartement: string
| | | |-- codeEPCI: string
| | | |-- codeRegion: string
| | | |-- latitude: double
| | | |-- longitude: double
| | | |-- natureJuridiqueEPCI: string
| | | |-- nomCommune: string
| | | |-- nomDepartement: string
| | | |-- nomEPCI: string
| | | |-- nomMajuscules: string
| | | |-- population: integer
| | | |-- sirenCommune: string
| | | |-- strateCommune: integer
| | | |-- surface: double
| | | |-- typeCommune: string
| | | |-- typeNomEtCharniere: string
| | |-- nomCommune: string
| | |-- populationCommuneCompteAPart: integer
| | |-- populationCommuneMunicipale: integer
| | |-- populationCommuneTotale: integer
| | |-- sirenCommune: string
| |-- dateCreation: date
| |-- dateEffet: date
| |-- email: string
| |-- faxAdministratif: string
| |-- faxSiege: string
| |-- siteInternet: string
| |-- telephoneAdministratif: string
| |-- telephoneSiege: string
|-- siren: string
|-- taxeEnlevementOrduresMenageres: boolean
|-- zoneDeMontagne: boolean
I've created an Aggregator class:
/**
* Agrégateur de {@link Perimetre} en {@link Groupement}
* @author Marc Le Bihan
*/
public class GroupementAggregator extends Aggregator<Perimetre, Groupement, Groupement> {
@Serial
private static final long serialVersionUID = 1861686194387336535L;
/**
* Renvoyer une valeur neutre pour cette aggrégation
* @return groupement vide
*/
@Override
public Groupement zero() {
Groupement zero = new Groupement();
zero.setNatureJuridique(NatureJuridiqueGroupement.METROPOLE_LYON);
zero.setMembresGroupement(new MembresGroupement());
return zero;
}
/**
* Combine deux valeurs pour en créer une autre (il est possible qu'elle modifie l'objet {@link Groupement} soumis plutôt que d'en créer un autre.
* @param groupement Groupement à réduire (cumuler dedans)
* @param perimetre Périmètre à considérer
* @return Groupement réduit
*/
@Override
public Groupement reduce(Groupement groupement, Perimetre perimetre) {
// Si le groupement que l'on nous soumet est vide, il s'agit de celui créé par la fonction zero.
// Mais il ne nous intéresse pas, et nous préférons recréer un vrai groupement, une première fois, d'après le périmètre reçu.
Groupement groupementReduit = groupement.getSiren() == null ? new Groupement(perimetre) : new Groupement(groupement);
if (groupementReduit.getMembresGroupement() == null) {
groupementReduit.setMembresGroupement(new MembresGroupement());
}
MembreGroupement membre = new MembreGroupement(perimetre);
groupementReduit.getMembresGroupement().add(membre);
return groupementReduit;
}
/**
* Fusionner deux valeurs intermédiaires
* @param a Premier {@link Groupement}
* @param b Deuxième {@link Groupement}
* @return Groupement déduit
*/
@Override
public Groupement merge(Groupement a, Groupement b) {
// Si l'un des groupements est le zéro, retourner l'autre
if (a.getSiren() == null) {
return b;
}
if (b.getSiren() == null) {
return a;
}
// FIXME : Fait un aggregate ici, mais sans tenir compte d'un group by qu'il faudrait faire avant
a.getMembresGroupement().addAll(b.getMembresGroupement());
return a;
}
/**
* Transformer la sortie après réduction
* @param reduction Contenu réduit
* @return {@link Groupement} final
*/
@Override
public Groupement finish(Groupement reduction) {
// Pas de modification à apporter à l'objet final
return reduction;
}
/**
* Renvoyer l'encodeur du type intermédiaire
* @return Encodeur de {@link Groupement}
*/
@Override
public Encoder<Groupement> bufferEncoder() {
return Encoders.bean(Groupement.class);
}
/**
* Renvoyer l'encodeur du type final
* @return Encodeur de {@link Groupement}
*/
@Override
public Encoder<Groupement> outputEncoder() {
return Encoders.bean(Groupement.class);
}
}
And I'm using it this way:
public Dataset<Groupement> datasetGroupementsAvecMembres(Dataset<Perimetre> datasetPerimetre) {
GroupementAggregator agreggateurGroupement = new GroupementAggregator();
TypedColumn<Perimetre, Groupement> groupements = agreggateurGroupement.toColumn().name("mG");
return datasetPerimetre.select(groupements);
}
You can see that it is lacking a groupBy(...) call that I can't yet find how to introduce. But my current problem is its behavior. When I'm debugging it:
It calls a lot of times the
zero()method of theAggregator.Its reduce function looks working the way I like: the
membreGroupementsvariable (should be renamedmembresGroupementto be more correct) of thegroupementobject being reduced is well filled:My
merge(...)method, however, is always receiving onezeroobject one side, each time it is called. So it isn't merging anything in fact, I think.Eventually, my
finish(...)method received agroupementobject that has no cities members.
As a workaround, I've succeeded in creating such P having a Map<P.key, C> member, another way.
But not perfectly. It was creating the objects, but not really a Dataset<P> having its C children inside:
I'm filling a Groupements that is a LinkedHashSet of Groupement by SIREN (String) key, using this method:
public Groupements groupementsAvecMembres(Dataset<Row> datasetRowPerimetres, int anneeCOG) {
Dataset<Perimetre> datasetPerimetres = this.datasetPerimetre.datasetPerimetres(datasetRowPerimetres, anneeCOG);
Dataset<Groupement> datasetGroupements = datasetGroupementsSansMembres(datasetRowPerimetres, anneeCOG);
Dataset<MembreGroupement> datasetMembresGroupements = this.datasetPerimetre.datasetMembresGroupements(datasetPerimetres);
return super.declinaison(new Groupements(),
datasetGroupements, datasetGroupements.col("siren"), Groupement::getSiren,
datasetMembresGroupements, datasetMembresGroupements.col("siren"), m -> m.getCommuneMembreGroupement().getSirenCommune(),
Groupement::getMembresGroupement);
}
with:
public <P extends Serializable, KP, E extends Serializable, KE, C extends Map<KP, P>> C declinaison(C ensemble,
Dataset<P> parents, Column columnJoinP, Function<P, KP> obtenirClefDuParent,
Dataset<E> enfants, Column columnJoinE, Function<E, KE> obtenirClefEnfant,
Function<P, Map<KE, E>> obtenirMapEnfants) {
Dataset<Tuple2<P, E>> ds = parents.joinWith(enfants, columnJoinP.equalTo(columnJoinE), "inner");
for(Tuple2<P, E> tuple : ds.collectAsList()) {
// Rechercher l'objet parent par KP, et l'ajouter à l'ensemble C, vide, s'il n'existe pas.
P source = tuple._1();
KP clefSourceParent = obtenirClefDuParent.apply(source);
P parent = ensemble.computeIfAbsent(clefSourceParent, clef -> tuple._1());
// Dans cet objet parent P, rechercher la liste des enfants E, indexée par KE, et y ajouter notre instance de E.
Map<KE, E> ensembleEnfants = obtenirMapEnfants.apply(parent);
E nouvelEnfant = tuple._2();
KE clefEnfant = obtenirClefEnfant.apply(nouvelEnfant);
ensembleEnfants.put(clefEnfant, nouvelEnfant);
}
return ensemble;
}
and this test shows that it works:
// Parametrized by annotation
String condition = String.format("sirenGroupement = %s", sirenGroupement);
Dataset<Row> rowPerimetres = datasetRowPerimetres(annee).where(condition);
Groupements groupements = this.groupementsDataset.groupementsAvecMembres(rowPerimetres, annee);
LOGGER.info("Groupements avec membres :");
assertNotEquals(0, groupements.size(), "Au moins un groupement avec membres aurait dû être trouvé pour " + nom);
LOGGER.info("{}", groupements);
shows:
Groupements avec membres :
2025-08-08 14:39:40.192 INFO 233869 --- [ main] f.e.a.o.s.d.g.GroupementsITCase :
nom : CA CIVIS (Communauté Intercommunale des Villes Solidaires), siren : 249740077, nature : Communauté d''agglomération,
siège : {commune siège : nom : Saint-Pierre, code commune : 97416, siren : 219740164, département : 974, code région : 04, population totale : 84950, municipale : 84077, comptée à part : 873, arrondissement : 2,
commune : {{Code commune : 97416, Nom : Saint-Pierre, Type : null, EPCI : null (null) - null, Nom en majuscules : null, Code département : null - null, Code région : null, Commune parente (d'arrondissement, déléguée, associée) : null, Type article et charnière : null, SIREN de la commune : 219740164, SIREN de l'intercommunalité dont elle est membre : null, arrondissement : 2, code canton : null, population : 84950 (strate : null), surface (en hectares) : null, longitude: null, latitude : null}} (code commune : {13}), département : 974, code région : 04, date de création : 2002-12-26, date d'effet : 2002-12-31,
adresse du siège : Adresse {nom : null, complément de nom : null, complément de voie : 29 Route de l'Entre-Deux, type de voie : null, numéro dans la voie : null, répétition dans la voie : null, voie : null, code postal : 97410, commune : 97410 SAINT-PIERRE, commentaire : null, code commune: null, bureau distributeur : null, code cedex : null, libellé cedex : null, code pays étranger : null, nom pays étranger : null}, téléphone : , fax siège : null, e-mail : [email protected], site internet : www.civis.re,
adresse administrative : Adresse {nom : null, complément de nom : null, complément de voie : null, type de voie : null, numéro dans la voie : null, répétition dans la voie : null, voie : 29, route de l'Entre-Deux, code postal : 97410, commune : 97410 SAINT-PIERRE, commentaire : null, code commune: null, bureau distributeur : null, code cedex : null, libellé cedex : null, code pays étranger : null, nom pays étranger : null}, téléphone administratif : , fax administratif : null},
population : 183407,
organigramme : {mode de répartition des sièges : RDC, autre mode : null, président(e) : (civilité : M., prénom : Michel, nom : FONTAINE), nombre de délégués : null, représentation/substitution (obsolète après 2024) : 0},
compétences : {nombre de compétences exercées (déclarées) : 43, compétences exercées : {C1533=code : C1533, obligatoire : null, déléguée : null, intérêt communautaire : null, C1510=code : C1510, obligatoire : null, déléguée : null, intérêt communautaire : null, C1532=code : C1532, obligatoire : null, déléguée : null, intérêt communautaire : null, [...]}, syndicat à la carte : null, compétence conservée : false}, nombre de communes du groupement: 6, densité démographique au km²: null, interdépartemental : false, zone de montagne : null, établissement public territorial de bassin : null, gestion des eaux : null,
fiscalité et financement : {Dotation globale : null, dotation de compensation : null, dotation de l'intercommunalité : null, dotation des groupements touristiques : null, population dotation globale de financement : null, DGF par habitant : null, potentiel fiscal : null, DGF bonifiée : 0, DSC : 0, autre taxe : null, autre redevance : null}, ordures ménagères : (redevance false, taxe : true),
membres : {
219740057=siren : 249740077, catégorie membres : Commune, commune membre : {nom : Petite-Île, code commune : 97405, siren : 219740057, département : 974, code région : 04, population totale : 12772, municipale : 12617, comptée à part : 155}, membre adhérent à un syndicat mixte : {nom : SM d'études et de programmation du SCOT du Grand Sud, siren : 259741080, population : 317956},
219740016=siren : 249740077, catégorie membres : Commune, commune membre : {nom : Les Avirons, code commune : 97401, siren : 219740016, département : 974, code région : 04, population totale : 11661, municipale : 11434, comptée à part : 227}, membre adhérent à un syndicat mixte : {nom : Ile De La Réunion Mobilités, siren : 200045276, population : 880875},
219740248=siren : 249740077, catégorie membres : Commune, commune membre : {nom : Cilaos, code commune : 97424, siren : 219740248, département : 974, code région : 04, population totale : 5437, municipale : 5390, comptée à part : 47}, membre adhérent à un syndicat mixte : {nom : SM de Pierrefonds, siren : 259741007, population : 353322},
219740149=siren : 249740077, catégorie membres : Commune, commune membre : {nom : Saint-Louis, code commune : 97414, siren : 219740149, département : 974, code région : 04, population totale : 54557, municipale : 53935, comptée à part : 622}, membre adhérent à un syndicat mixte : {nom : SAEP des Hirondelles, siren : 200101079, population : 317956},
219740040=siren : 249740077, catégorie membres : Commune, commune membre : {nom : L'Étang-Salé, code commune : 97404, siren : 219740040, département : 974, code région : 04, population totale : 14030, municipale : 13836, comptée à part : 194}, membre adhérent à un syndicat mixte : {nom : SM de traitement des déchets des microrégions Sud et Ouest de La Réunion dénommé ILEVA, siren : 200045342, population : 536140},
219740164=siren : 249740077, catégorie membres : Commune, commune membre : {nom : Saint-Pierre, code commune : 97416, siren : 219740164, département : 974, code région : 04, population totale : 84950, municipale : 84077, comptée à part : 873}, membre adhérent à un syndicat mixte : {nom : null, siren : null, population : null}
}
But it breaks Spark flow (no Dataset<Groupement> with items properly filled with cities exists). And it can cause a memory overflow.


