3

Let's say I have this data frame saved in a parquet format

import numpy as np
import pandas as pd

data = pd.DataFrame(dict(
    a=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 
    b=[1.0, 1.0, 1.0, np.NaN, 0.0, np.NaN],
    c=[0.9, np.NaN, 1.0, 0.0, 0.0, 0.0]
))

data.to_parquet('data.parquet')

along with a dictionary that tells me which values I should use for the imputation. Then I can write a preprocessing function.

import tensorflow as tf

impute_dictionary = dict(b=1.0, c=0.0)

def preprocessing_fn(inputs):
    outputs = inputs.copy()

    for key, value in impute_dictionary.items():
        outputs[key] = tf.where(
            tf.math.is_nan(outputs[key]),
            tf.constant(value, shape=outputs[key].shape),
            outputs[key]
        )

    return outputs

and use it in Apache Beam pipeline

import tempfile

import apache_beam as beam
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils

temp = tempfile.gettempdir()

RAW_DATA_FEATURE_SPEC = dict(
    [(name, tf.io.FixedLenFeature([], tf.float32)) for name in ['a', 'b', 'c']] 
)

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))

with beam.Pipeline() as pipeline:
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data.parquet')
        raw_dataset = (raw_data, RAW_DATA_METADATA)
        transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
        transformed_data, transformed_metadata = transformed_dataset
        transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)

I get this error: TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'

It seems that outputs[key].shape is (None,)

Any suggestion?

Package versions:

tensorflow==2.1.0
tensorflow-transform==0.21.0
pandas==1.0.0
numpy==1.18.1
apache-beam==2.19.0

Entire error message:

WARNING: Logging before flag parsing goes to stderr.
W0204 10:36:03.793034 140735593104256 interactive_environment.py:113] Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
W0204 10:36:03.793169 140735593104256 interactive_environment.py:125] You have limited Interactive Beam features since your ipython kernel is not connected any notebook frontend.
W0204 10:36:03.929135 140735593104256 impl.py:360] Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
W0204 10:36:03.929914 140735593104256 impl.py:360] Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-465b3f61784c> in <module>
     17         raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data.parquet')
     18         raw_dataset = (raw_data, RAW_DATA_METADATA)
---> 19         transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
     20         transformed_data, transformed_metadata = transformed_dataset
     21         transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)

/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
    547     pvalueish = _SetInputPValues().visit(pvalueish, replacements)
    548     self.pipeline = p
--> 549     result = p.apply(self, pvalueish, label)
    550     if deferred:
    551       return result

/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    575       transform.type_check_inputs(pvalueish)
    576 
--> 577     pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
    578 
    579     if type_options is not None and type_options.pipeline_type_check:

/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
    193       m = getattr(self, 'apply_%s' % cls.__name__, None)
    194       if m:
--> 195         return m(transform, input, options)
    196     raise NotImplementedError(
    197         'Execution of [%s] not implemented in runner %s.' % (transform, self))

/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
    223   def apply_PTransform(self, transform, input, options):
    224     # The base case of apply is to call the transform's expand.
--> 225     return transform.expand(input)
    226 
    227   def run_transform(self,

/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
    861     # e.g. caching the values of expensive computations done in AnalyzeDataset.
    862     transform_fn = (
--> 863         dataset | 'AnalyzeDataset' >> AnalyzeDataset(self._preprocessing_fn))
    864 
    865     if Context.get_use_deep_copy_optimization():

/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, pvalueish, _unused)
    987 
    988   def __ror__(self, pvalueish, _unused=None):
--> 989     return self.transform.__ror__(pvalueish, self.label)
    990 
    991   def expand(self, pvalue):

/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
    547     pvalueish = _SetInputPValues().visit(pvalueish, replacements)
    548     self.pipeline = p
--> 549     result = p.apply(self, pvalueish, label)
    550     if deferred:
    551       return result

/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    534       try:
    535         old_label, transform.label = transform.label, label
--> 536         return self.apply(transform, pvalueish)
    537       finally:
    538         transform.label = old_label

/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    575       transform.type_check_inputs(pvalueish)
    576 
--> 577     pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
    578 
    579     if type_options is not None and type_options.pipeline_type_check:

/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
    193       m = getattr(self, 'apply_%s' % cls.__name__, None)
    194       if m:
--> 195         return m(transform, input, options)
    196     raise NotImplementedError(
    197         'Execution of [%s] not implemented in runner %s.' % (transform, self))

/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
    223   def apply_PTransform(self, transform, input, options):
    224     # The base case of apply is to call the transform's expand.
--> 225     return transform.expand(input)
    226 
    227   def run_transform(self,

/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
    808     input_values, input_metadata = dataset
    809     result, cache = super(AnalyzeDataset, self).expand((input_values, None,
--> 810                                                         None, input_metadata))
    811     assert not cache
    812     return result

/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
    681         copied_inputs = impl_helper.copy_tensors(input_signature)
    682 
--> 683       output_signature = self._preprocessing_fn(copied_inputs)
    684 
    685     # At this point we check that the preprocessing_fn has at least one

<ipython-input-2-205d9abf4136> in preprocessing_fn(inputs)
      9         outputs[key] = tf.where(
     10             tf.math.is_nan(outputs[key]),
---> 11             tf.constant(value, shape=outputs[key].shape),
     12             outputs[key]
     13         )

/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/constant_op.py in constant(value, dtype, shape, name)
    256   """
    257   return _constant_impl(value, dtype, shape, name, verify_shape=False,
--> 258                         allow_broadcast=True)
    259 
    260 

/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/constant_op.py in _constant_impl(value, dtype, shape, name, verify_shape, allow_broadcast)
    294       tensor_util.make_tensor_proto(
    295           value, dtype=dtype, shape=shape, verify_shape=verify_shape,
--> 296           allow_broadcast=allow_broadcast))
    297   dtype_value = attr_value_pb2.AttrValue(type=tensor_value.tensor.dtype)
    298   const_tensor = g._create_op_internal(  # pylint: disable=protected-access

/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/tensor_util.py in make_tensor_proto(values, dtype, shape, verify_shape, allow_broadcast)
    446     # If shape is None, numpy.prod returns None when dtype is not set, but
    447     # raises exception when dtype is set to np.int64
--> 448     if shape is not None and np.prod(shape, dtype=np.int64) == 0:
    449       nparray = np.empty(shape, dtype=np_dt)
    450     else:

<__array_function__ internals> in prod(*args, **kwargs)

/usr/local/lib/python3.7/site-packages/numpy/core/fromnumeric.py in prod(a, axis, dtype, out, keepdims, initial, where)
   2960     """
   2961     return _wrapreduction(a, np.multiply, 'prod', axis, dtype, out,
-> 2962                           keepdims=keepdims, initial=initial, where=where)
   2963 
   2964 

/usr/local/lib/python3.7/site-packages/numpy/core/fromnumeric.py in _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs)
     88                 return reduction(axis=axis, out=out, **passkwargs)
     89 
---> 90     return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
     91 
     92 

TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'
2
  • Please share the entire error message. Commented Feb 3, 2020 at 23:42
  • @AMC I've shared it. Thank you for pointing this out. Commented Feb 4, 2020 at 10:39

1 Answer 1

5

The problem is that I set the shape in tf.constant(value, shape=outputs[key].shape). I should have only used tf.constant(value, dtype=tf.float32).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.