Posts /

TensorFlow Programmer’s Guide Part 2

Twitter Facebook
26 Jul 2017

Theading and Queues

Queues are a powerful mechanism for asynchronous computation using TensorFlow.

A queue is a node in a TensorFlow graph. below is an example:

as shown above, a queue is a stateful node, like a variable, other nodes can modify it’s content: enqueue, dequeue …

Enqueue, EnqueueMany and Dequeue are special nodes. they take a pointer to the queue instead of a normal value, allowing them to change it.

NOTE Queue methos must run on the same device as the queue.

Queue usage overview

a typecial input architecture is to use a RandomShuffleQueue to prepare inputs for training a model:

The TensorFlow Session object is multithreaded, so multiple threads can easily use the same session and run ops in parallel. In order to use threads as described above, all threads must be able to stop together, exceptions must be caught and reported, and queues must be properly closed when stopping.

TensorFlow provides two classes to help:

Coordinator

the Coordinator class helps multiple threads stop together.

its key methods:

you first create a Coordinator object, and then create a number of threads that use the coordinator. The threads typically run loops that stop when should_stop returns True.

any thread can decide taht the computation should stop. it only has to call request_stop and the other threads will stop as should_stop will then return True.

# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop
def MyLoop(coord):
    while not coord.should_stop():
        ... do something ...
        if ... some condition ...:
            coord.request_stop()
# Main thread: create a coordinator
coord = tf.train.Coordinator()

# Create 10 thread that run MyLoop()
threads = [threading.Thread(traget=MyLoop, args=(coord,)) for i in xrange(10)]

# Start the threads and wait for all of them to stop
for t in threads:
    t.start()
coord.join(threads)

QueueRunner

The QueueRunner class creates a number of threads that repeatedly run an enqueue op. These threads can use a coordinator to stop together. In addition, a queue runner runs a closer thread that automatically closes the queue if an exception is reported to the coordinator.

# First Build a graph that ueses a TensorFlow queue(e.g. a tf.RandomShuffleQueue) for input examples.
example = ... ops to create one example
# Create a queue, and an op that enqueues examples one at a time in the queue
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# create a training graph that starts by dequeuing a batch of examples
inputs = queue.dequeue_many(batch_size)
train_op = ...user 'inputs' to build the training part of the graph ...
# Create a queue runner that will run 4 threads in parallel to enqueue examples
qr = tf.train.QueueRunner(queue, [enqueue_op]*4)

# Launch the graph
sess = tf.Session()
# Create a coordinator, launch the queue runner threads
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator
for step in xrange(1000000):
    if coord.should_stop():
        break
    sess.run(train_op)
# When done, ask the threads to stop
coord.request_stop()
# And wait for them to actually do it
coord.join(enqueue_threads)

Handling Exceptions

here is an imporoved version of the training loop above:

try:
    for step in xrange(1000000):
        if coord.should_stop():
            break
        sess.run(train_op)
except Exception e:
    # Report exceptions to the coordinator.
    coord.request_stop(e)
finally:
    # Terminate as usual. it is safe to call `coord.requeset_stop()` twice
    coord.request_stop()
    coord.join(threads)

Reading Data

Feeding

with tf.Session():
    input = tf.placeholder(tf.float32)
    classifer = ...
    print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

you can replace any Tensor with feed data, including variables and constants, the best practice is to use a tf.placeholder node.

Reading from files

A typical pipeline for reading records from files has the following stages:

  1. the list of filenames

    pass the list of filenames to the tf.train.string_input_producer function. it creates a FIFO queue for holding the filenames until the reader needs them.

    string_input_producer has options for shuffling and setting a maximum number of epochs.

    A queue runner adds the whole list of filenames to the queue once for each epoch. the queue runner works in a thread separate from the reader that pulls filenames from the queue, so that the shuffling and enqueuing process does not block the reader.

  2. Optional file name shuffling

  3. Optional epoch limit

  4. Filename queue

  5. A Reader for the file format

  6. A decoder for a record read by the reader

  7. Optional preprocessing

  8. Example queue

File Formats

select the reader that matches your input file format and pass the filename queue to the reader’s method.

the read method outputs a key identifying the file and record (useful for debugging if you have some weird records), and a scalar string value. Use one or more of the decoder and conversion ops to decode this string into the tensors that make up an example.

CSV(comma separated value) file use a tf.TextLineReader with the tf.decode_csv operation.

filename_queue = tf.train.string_input_producer(['file0.csv', 'file1.csv'])

reader = tf.TextLineReader()
key, value = reader.read(filename_queue)

# Default values, in case of empty columns. Also specifies the type of the decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:
    # Start populating the filename queue
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    
    for i in range(1200):
        # Retrieve a single instance
        example, lable = sess.run([features, col5])
	coord.request_stop()
    coord.join(threads)

you must call tf.train.start_queue_runners to populate the queue before you call run or eval to execute the read. Otherwise read will block while it waits for filenames form the queue.

Fixed length records

to read biary files in which each record is a fixed number of bytes, use tf.FixedLengthRecordReader with the tf.decode_raw operation. the decode_raw op converts from a string to a uint8 tensor.

Standard Tensorflow format

Another approach is to convert whatever data you have into a supported format.

the recommended format for Tensorflow is a TFRecords file containing tf.train.Example protocol buffers(which contain Features as a field).

we can write a little program that get your data, stuffs in an Example protocol buffer, serializers the protocol buffer to a string, and then writes the string to a TFRecords file using the tf.python_io.TFRecordWriter.

to read a file of TFRecords, use tf.TFRecordReader with the tf.parse_single_example decoder. the parse_single_example op decodes the example protocol buffers into tensors.

Preprocessing

Batching

at the end of the pipeline, we use another queue to batch together examples for training, evaluation, or inference. for this we use a queue that randomizes the order of examples, using the tf.train.shuffle_batch.

def read_my_file_format(filename_queue):
    reader = tf.SomeReader()
    key, record_string = reader.read(filename_queue)
    example, lable = tf.some_decoder(record_string)
    processed_example = some_processing(example)
    return processed_example, label
def input_pipeline(filenames, batch_size, num_epochs=None):
    filename_queue = tf.train.string_input_producer(filenames, num_epochs=num_epochs, shuffle=True)
    example, label = read_my_file_format(filename_queue)
    # min_after_dequeue defines how big a buffer we'll randomly sample from --- bigger means better shuffling but slower start up and more memory used
    # capacity must be larger than min_after_dequeue and the amount larger determines the maximum we'll prefetch.
    # Recommendation: min_after_dequeue + (num_threads+a small safety margin)*batch_size
    min_after_dequeue = 10000
    capacity = min_after_dequeue + 3 * batch_size
    example_batch, label_batch = tf.train.shuffle_batch(
    	[example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue)
    return example_batch, label_batch

Creating threads to prefetch using QueueRunner objects

the short version: many of the tf.train functions listed above add tf.train.QueueRunner objects to your graph. these require that you call tf.train.start_queue_runners before running any training or inference steps, or it’ll hang forever.

if you set a limit on the number of epochs, that will use an epoch counter that will need to be initialized. the recommended code pattern combining these is :

# Create the graph, etc.
init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter)
sess.run(init_op)

# Start input enqueue threads
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)
except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# wait for threads to finish
coord.join(threads)
sess.close()

First we create the graph. It will have a few pipeline stages that are connected by queues. The first stage will generate filenames to read and enqueue them in the filename queue. The second stage consumes filenames (using a Reader), produces examples, and enqueues them in an example queue. Depending on how you have set things up, you may actually have a few independent copies of the second stage, so that you can read from multiple files in parallel. At the end of these stages is an enqueue operation, which enqueues into a queue that the next stage dequeues from. We want to start threads running these enqueuing operations, so that our training loop can dequeue examples from the example queue.

The helpers in tf.train that create these queues and enqueuing operations add a tf.train.QueueRunner to the graph using thetf.train.add_queue_runner function. Each QueueRunner is responsible for one stage, and holds the list of enqueue operations that need to be run in threads. Once the graph is constructed, thetf.train.start_queue_runners function asks each QueueRunner in the graph to start its threads running the enqueuing operations.

If all goes well, you can now run your training steps and the queues will be filled by the background threads. If you have set an epoch limit, at some point an attempt to dequeue examples will get antf.errors.OutOfRangeError. This is the TensorFlow equivalent of “end of file” (EOF) – this means the epoch limit has been reached and no more examples are available.

The last ingredient is the tf.train.Coordinator. This is responsible for letting all the threads know if anything has signalled a shut down. Most commonly this would be because an exception was raised, for example one of the threads got an error when running some operation (or an ordinary Python exception).

Reading data

There are three main methods of getting data into a TensorFlow program:

Feeding

TensorFlow’s feed mechanism lets you inject data into any Tensor in a computation graph. A python computation can thus feed data directly into the graph.

Supply feed data through the feed_dict argument to a run() or eval() call that initiates computation.

with tf.Session():
  input = tf.placeholder(tf.float32)
  classifier = ...
  print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

While you can replace any Tensor with feed data, including variables and constants, the best practice is to use a tf.placeholder node. Aplaceholder exists solely to serve as the target of feeds. It is not initialized and contains no data. A placeholder generates an error if it is executed without a feed, so you won’t forget to feed it.

An example using placeholder and feeding to train on MNIST data can be found intensorflow/examples/tutorials/mnist/fully_connected_feed.py, and is described in the MNIST tutorial.

Reading from files

A typical pipeline for reading records from files has the following stages:

  1. The list of filenames
  2. Optional filename shuffling
  3. Optional epoch limit
  4. Filename queue
  5. A Reader for the file format
  6. A decoder for a record read by the reader
  7. Optional preprocessing
  8. Example queue

Filenames, shuffling, and epoch limits

For the list of filenames, use either a constant string Tensor (like["file0", "file1"] or [("file%d" % i) for i in range(2)]) or the tf.train.match_filenames_once function.

Pass the list of filenames to the tf.train.string_input_producerfunction. string_input_producer creates a FIFO queue for holding the filenames until the reader needs them.

string_input_producer has options for shuffling and setting a maximum number of epochs. A queue runner adds the whole list of filenames to the queue once for each epoch, shuffling the filenames within an epoch if shuffle=True. This procedure provides a uniform sampling of files, so that examples are not under- or over- sampled relative to each other.

The queue runner works in a thread separate from the reader that pulls filenames from the queue, so the shuffling and enqueuing process does not block the reader.

File formats

Select the reader that matches your input file format and pass the filename queue to the reader’s read method. The read method outputs a key identifying the file and record (useful for debugging if you have some weird records), and a scalar string value. Use one (or more) of the decoder and conversion ops to decode this string into the tensors that make up an example.

CSV files

To read text files in comma-separated value (CSV) format, use atf.TextLineReader with the tf.decode_csv operation. For example:

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])

reader = tf.TextLineReader()
key, value = reader.read(filename_queue)

# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
    value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:
  # Start populating the filename queue.
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(1200):
    # Retrieve a single instance:
    example, label = sess.run([features, col5])

  coord.request_stop()
  coord.join(threads)

Each execution of read reads a single line from the file. Thedecode_csv op then parses the result into a list of tensors. Therecord_defaults argument determines the type of the resulting tensors and sets the default value to use if a value is missing in the input string.

You must call tf.train.start_queue_runners to populate the queue before you call run or eval to execute the read. Otherwise readwill block while it waits for filenames from the queue.

Fixed length records

To read binary files in which each record is a fixed number of bytes, usetf.FixedLengthRecordReader with the tf.decode_raw operation. The decode_raw op converts from a string to a uint8 tensor.

For example, the CIFAR-10 dataset uses a file format where each record is represented using a fixed number of bytes: 1 byte for the label followed by 3072 bytes of image data. Once you have a uint8 tensor, standard operations can slice out each piece and reformat as needed. For CIFAR-10, you can see how to do the reading and decoding intensorflow_models/tutorials/image/cifar10/cifar10_input.pyand described in this tutorial.

Standard TensorFlow format

Another approach is to convert whatever data you have into a supported format. This approach makes it easier to mix and match data sets and network architectures. The recommended format for TensorFlow is aTFRecords file containing tf.train.Example protocol buffers (which contain Features as a field). You write a little program that gets your data, stuffs it in an Example protocol buffer, serializes the protocol buffer to a string, and then writes the string to a TFRecords file using the tf.python_io.TFRecordWriter. For example,tensorflow/examples/how_tos/reading_data/convert_to_records.py converts MNIST data to this format.

To read a file of TFRecords, use tf.TFRecordReader with the tf.parse_single_example decoder. The parse_single_example op decodes the example protocol buffers into tensors. An MNIST example using the data produced by convert_to_records can be found intensorflow/examples/how_tos/reading_data/fully_connected_reader.py, which you can compare with the fully_connected_feedversion.

Preprocessing

You can then do any preprocessing of these examples you want. This would be any processing that doesn’t depend on trainable parameters. Examples include normalization of your data, picking a random slice, adding noise or distortions, etc. Seetensorflow_models/tutorials/image/cifar10/cifar10_input.pyfor an example.

Batching

At the end of the pipeline we use another queue to batch together examples for training, evaluation, or inference. For this we use a queue that randomizes the order of examples, using thetf.train.shuffle_batch.

Example:

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue defines how big a buffer we will randomly sample
  #   from -- bigger means better shuffling but slower start up and more
  #   memory used.
  # capacity must be larger than min_after_dequeue and the amount larger
  #   determines the maximum we will prefetch.  Recommendation:
  #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

If you need more parallelism or shuffling of examples between files, use multiple reader instances using the tf.train.shuffle_batch_join. For example:

def read_my_file_format(filename_queue):
  # Same as above

def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example_list = [read_my_file_format(filename_queue)
                  for _ in range(read_threads)]
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch_join(
      example_list, batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

You still only use a single filename queue that is shared by all the readers. That way we ensure that the different readers use different files from the same epoch until all the files from the epoch have been started. (It is also usually sufficient to have a single thread filling the filename queue.)

An alternative is to use a single reader via thetf.train.shuffle_batch with num_threads bigger than 1. This will make it read from a single file at the same time (but faster than with 1 thread), instead of N files at once. This can be important:

How many threads do you need? the tf.train.shuffle_batch*functions add a summary to the graph that indicates how full the example queue is. If you have enough reading threads, that summary will stay above zero. You can view your summaries as training progresses using TensorBoard.

Creating threads to prefetch using QueueRunner objects

The short version: many of the tf.train functions listed above addtf.train.QueueRunner objects to your graph. These require that you call tf.train.start_queue_runners before running any training or inference steps, or it will hang forever. This will start threads that run the input pipeline, filling the example queue so that the dequeue to get the examples will succeed. This is best combined with atf.train.Coordinator to cleanly shut down these threads when there are errors. If you set a limit on the number of epochs, that will use an epoch counter that will need to be initialized. The recommended code pattern combining these is:

# Create the graph, etc.
init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)

except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close()

Aside: What is happening here?

First we create the graph. It will have a few pipeline stages that are connected by queues. The first stage will generate filenames to read and enqueue them in the filename queue. The second stage consumes filenames (using a Reader), produces examples, and enqueues them in an example queue. Depending on how you have set things up, you may actually have a few independent copies of the second stage, so that you can read from multiple files in parallel. At the end of these stages is an enqueue operation, which enqueues into a queue that the next stage dequeues from. We want to start threads running these enqueuing operations, so that our training loop can dequeue examples from the example queue.

img

The helpers in tf.train that create these queues and enqueuing operations add a tf.train.QueueRunner to the graph using thetf.train.add_queue_runner function. Each QueueRunner is responsible for one stage, and holds the list of enqueue operations that need to be run in threads. Once the graph is constructed, thetf.train.start_queue_runners function asks each QueueRunner in the graph to start its threads running the enqueuing operations.

If all goes well, you can now run your training steps and the queues will be filled by the background threads. If you have set an epoch limit, at some point an attempt to dequeue examples will get antf.errors.OutOfRangeError. This is the TensorFlow equivalent of “end of file” (EOF) – this means the epoch limit has been reached and no more examples are available.

The last ingredient is the tf.train.Coordinator. This is responsible for letting all the threads know if anything has signalled a shut down. Most commonly this would be because an exception was raised, for example one of the threads got an error when running some operation (or an ordinary Python exception).

For more about threading, queues, QueueRunners, and Coordinators see here.

Aside: How clean shut-down when limiting epochs works

Imagine you have a model that has set a limit on the number of epochs to train on. That means that the thread generating filenames will only run that many times before generating an OutOfRange error. The QueueRunner will catch that error, close the filename queue, and exit the thread.


Twitter Facebook