0

I have a user case like this. I have a list of many queries. I am running multi-threading with pyspark with each thread submitting some sql.

There are some queries that report success but the final table has no data. Spark-ui has some tasks in stage that report OOM, but in the end it is successful. Have you encountered any similar cases or have any comments? spark-ui

def run_single_rule(self, log):
    try:
        dataset = self.spark.sql(sql_filter)
        result_count = dataset.count()
        print(
            f"""
              Statement for building rule [{log.rule_id}] result:
              {sql_filter}
              dataset after processed contains {result_count} records
              """
        )
        write_to_uc_table(dataset, self.job_output)

    except Exception:
        logger.warning(
            f"""
                       rule_id = {log.rule_id} has failed with log:
                       {full_error_log}
                       """
        )
    return update_values
with ThreadPoolExecutor(max_workers=10) as executor:
    future_job_runs = {
        executor.submit(self.run_single_rule, query): query
        for query in not_processed_log
    }
    wait(future_job_runs)
    for future in as_completed(future_job_runs):
        log = future_job_runs[future]
        try:
            log_records.append(future.result())
        except Exception as exc:
            print(f"rule_id: {log.rule_id}  generated an exception: {exc}")
        else:
            print(
                f"Finist log_id: {log.rule_id} finised with result: {future.result()}"
            )
def write_to_uc_table(df, job_output: JobOutput):
    writer = df.write
    if job_output.partition_columns:
        writer = writer.partitionBy(job_output.partition_columns)
    if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
        writer = writer.option("partitionOverwriteMode", "dynamic")
        writer = writer.mode("overwrite")
    else:
        writer = writer.mode(job_output.write_mode)
    if job_output.options:
        for k, v in job_output.options.items():
            writer = writer.option(k, v)
    writer.saveAsTable(
        f"{job_output.target_catalog}.{job_output.target_schema}.{job_output.target_table}"
    )

There are some queries that report success but the final table has no data for example:

Statement for building rule [10030006] result:
SELECT ... FROM ...
dataset after processed contains 650048 records
Finist log_id: 10030006 finised with result: {'job_start_time': datetime.datetime(2025, 10, 31, 1, 7, 2, 469565, tzinfo=datetime.timezone.utc), 'log_id': '5763b7d8-b5ee-11f0-a43c-00163eb2a776', 'result_record_count': None, 'state': 'FAILED', 'job_end_time': datetime.datetime(2025, 10, 31, 1, 7, 25, 763043, tzinfo=datetime.timezone.utc)}

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.