I am running this code daily and I am saving the output on some chosen location:
def aggConnections()(input: DataFrame) = {
input
.groupBy (
$"spid" (0).as ("domain_userid"),
$"cxid" (0).as ("key_id") )
.agg (
min ($"Time").as ("first_seen"),
max ($"Time").as ("last_seen") )
.select ($"domain_userid", $"key_id",
lit ("cxense_id").as ("key_source"),
lit (1).as ("times_seen"),
$"first_seen", $"last_seen")
.filter($"domain_userid".isNotNull && $"key_id".isNotNull)
}
val requests = spark.read.parquet("day1").transform(aggConnections())
spid is an array and it looks like this:
spid:array
element:string
That's why I had to access it like $"spid" (0).as ("domain_userid")
Unfortunately, on some days I get the following error when running this job on AWS EMR running Spark 3.0.1:
diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`spid`' given input columns: [AdUnitId, AudienceSegmentIds, BandWidth, BandwidthGroupId, BandwidthId, Browser, BrowserId, City, CityId, CmsMetadata, Country, CountryId, DeviceCategory, Domain, GfpContentId, IsCompanion, IsFilledRequest, IsInterstitial, IsVideoFallbackRequest, KeyPart, Metro, MetroId, MobileAppId, MobileCapability, MobileCarrier, MobileDevice, OS, OSId, OSVersion, PodPosition, PostalCode, PostalCodeId, PublisherProvidedID, RefererURL, Region, RegionId, RequestLanguage, RequestedAdUnitSizes, Time, TimeUsec2, UserId, VideoPosition, addefend, ads_enabled, app_name, app_version, bereidingstijd, brand, careerlevel, cat, category, categorycode, categoryname, channel_id, channel_title, cid, cmssource, companyname, compid, content_url, contentid, contentsubstype, crt_cpm, crt_size, cxid, cxsg, dc_yt, deviceid, dos, dossier, educationlevel, floor, fsr, gang, gdpr_applies, gdpr_consentstring, gelegenheid, hb_pb, hb_size, hour, ifa, industry, ingredient, iom, itemid, ix_apnx_om, ix_cdb_om, ix_imdi_cpm, jobtitle, k21, kage, kar, kauth, kembed, keuken, kgender, klg, ko, kpid, kvlg, kvz, kw, lat, long, mc_asset_type, mediation, model, moeilijkheid, origin, pag, pagetype, path, pay, pos, positie, production_id, productname, retailercode, retailername, rpfl_12108, screen, sector, shopid, show_id, show_title, soort, src, stad, starttype, station, subforum, tag, theme, top, video_duration, video_label, yt_vrallowed, ytdevice];; 'Aggregate ['spid[0], cxid#136[0]], ['spid[0] AS domain_userid#276, cxid#136[0] AS key_id#277, min(Time#0) AS first_seen#417, max(Time#0) AS last_seen#419]
I thought the problem was because the spid column was not present, howerver it is in the data when I check the schema. Therfore, I decided to test it more accurately on Databricks. The weird thing is that If I run this code on Databricks running the same Spark version 3.0.1 everything works fine on every day of data, including the ones that fail on EMR. I really can't explain what happens.