78

I am loading some data into Spark with a wrapper function:

def load_data( filename ):
    df = sqlContext.read.format("com.databricks.spark.csv")\
        .option("delimiter", "\t")\
        .option("header", "false")\
        .option("mode", "DROPMALFORMED")\
        .load(filename)
    # add the filename base as hostname
    ( hostname, _ ) = os.path.splitext( os.path.basename(filename) )
    ( hostname, _ ) = os.path.splitext( hostname )
    df = df.withColumn('hostname', lit(hostname))
    return df

specifically, I am using a glob to load a bunch of files at once:

df = load_data( '/scratch/*.txt.gz' )

the files are:

/scratch/host1.txt.gz
/scratch/host2.txt.gz
...

I would like the column 'hostname' to actually contain the real name of the file being loaded rather than the glob (ie host1, host2 etc, rather than *).

How can I do this?

3 Answers 3

168

You can use input_file_name which:

Creates a string column for the file name of the current Spark task.

from  pyspark.sql.functions import input_file_name

df.withColumn("filename", input_file_name())

Same thing in Scala:

import org.apache.spark.sql.functions.input_file_name

df.withColumn("filename", input_file_name)
Sign up to request clarification or add additional context in comments.

2 Comments

is it possible to get file creation date in some similiar way ? I didn't find any function in pyspark.sql.functions for it.
If you load csv files, don't use spark.read.csv("<data_path>"), use spark.read.format("csv").load("<data_path>")
5

input_file_name() is deprecated for the _metadata column, e.g., "_metadata.file_name"; refer to docs at file-metadata

2 Comments

Can you elaborate on how to add the filename using this? I guess something like df.withColumn("filename", F.col("_metadata.file_name"))?
Added as another answer below.
2

This code works:

dfOut = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", strFormat) \
    .option("cloudFiles.schemaLocation", strFolderPathArchive) \
    .option("delimiter", strSeparator) \
    .option("startingTimestamp", strStartingTimestamp) \
    .load(strFolderPathData) \
    .withColumn("timestamp_ingested", lit(current_timestamp()).cast(TimestampType())) \
    .withColumn("input_file_name", col("_metadata.file_path").cast(StringType()))

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.