I have a user case like this. I have a list of many queries. I am running multi-threading with pyspark with each thread submitting some sql.
There are some queries that report success but the final table has no data. Spark-ui has some tasks in stage that report OOM, but in the end it is successful. Have you encountered any similar cases or have any comments? spark-ui
def run_single_rule(self, log):
try:
dataset = self.spark.sql(sql_filter)
result_count = dataset.count()
print(
f"""
Statement for building rule [{log.rule_id}] result:
{sql_filter}
dataset after processed contains {result_count} records
"""
)
write_to_uc_table(dataset, self.job_output)
except Exception:
logger.warning(
f"""
rule_id = {log.rule_id} has failed with log:
{full_error_log}
"""
)
return update_values
with ThreadPoolExecutor(max_workers=10) as executor:
future_job_runs = {
executor.submit(self.run_single_rule, query): query
for query in not_processed_log
}
wait(future_job_runs)
for future in as_completed(future_job_runs):
log = future_job_runs[future]
try:
log_records.append(future.result())
except Exception as exc:
print(f"rule_id: {log.rule_id} generated an exception: {exc}")
else:
print(
f"Finist log_id: {log.rule_id} finised with result: {future.result()}"
)
def write_to_uc_table(df, job_output: JobOutput):
writer = df.write
if job_output.partition_columns:
writer = writer.partitionBy(job_output.partition_columns)
if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
writer = writer.option("partitionOverwriteMode", "dynamic")
writer = writer.mode("overwrite")
else:
writer = writer.mode(job_output.write_mode)
if job_output.options:
for k, v in job_output.options.items():
writer = writer.option(k, v)
writer.saveAsTable(
f"{job_output.target_catalog}.{job_output.target_schema}.{job_output.target_table}"
)
There are some queries that report success but the final table has no data for example:
Statement for building rule [10030006] result:
SELECT ... FROM ...
dataset after processed contains 650048 records
Finist log_id: 10030006 finised with result: {'job_start_time': datetime.datetime(2025, 10, 31, 1, 7, 2, 469565, tzinfo=datetime.timezone.utc), 'log_id': '5763b7d8-b5ee-11f0-a43c-00163eb2a776', 'result_record_count': None, 'state': 'FAILED', 'job_end_time': datetime.datetime(2025, 10, 31, 1, 7, 25, 763043, tzinfo=datetime.timezone.utc)}