TF 生成计算图

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: TF 生成计算图

生成计算图分析

假设我们有以下的tensorflow程序:
程序中,两个Input tensor A和B,一个常量tensor 2,两个计算Operation Plus2和PlusB。
程序计算A+2的结果。

//============================================================================
// Name        : TensorflowTest.cpp
// Author      : 
// Version     :
// Copyright   : Your copyright notice
// Description : Hello World in C++, Ansi-style
//============================================================================
 
#include <iostream>
#include <tensorflow/c/c_api.h>
#include <tensorflow/c/c_test_util.h>
 
#include <algorithm>
#include <cstddef>
#include <iterator>
#include <memory>
#include <vector>
#include <string.h>
 
using namespace std;
 
int main() {
	cout << "!!!Hello World!!!" << endl; // prints !!!Hello World!!!
	cout << "Hello from TensorFlow C library version" << TF_Version() << endl;
 
	TF_Status* s = TF_NewStatus();
	TF_Graph* graph = TF_NewGraph();
 
	// Construct the graph: A + 2 + B
	TF_Operation* a = Placeholder(graph, s, "A");
	cout << TF_Message(s);
 
	TF_Operation* b = Placeholder(graph, s, "B");
	cout << TF_Message(s);
 
	TF_Operation* two = ScalarConst(2, graph, s);
	cout << TF_Message(s);
 
	TF_Operation* plus2 = Add(a, two, graph, s, "plus2");
	cout << TF_Message(s);
 
	TF_Operation* plusB = Add(plus2, b, graph, s, "plusB");
	cout << TF_Message(s);
 
	// Setup a session and a partial run handle.  The partial run will allow
	// computation of A + 2 + B in two phases (calls to TF_SessionPRun):
	// 1. Feed A and get (A+2)
	// 2. Feed B and get (A+2)+B
	TF_SessionOptions* opts = TF_NewSessionOptions();
	TF_Session* sess = TF_NewSession(graph, opts, s);
	TF_DeleteSessionOptions(opts);
 
	TF_Output feeds[] = { TF_Output { a, 0 }, TF_Output { b, 0 } };
	TF_Output fetches[] = { TF_Output { plus2, 0 }, TF_Output { plusB, 0 } };
 
	const char* handle = nullptr;
	TF_SessionPRunSetup(sess, feeds, TF_ARRAYSIZE(feeds), fetches,
			TF_ARRAYSIZE(fetches), NULL, 0, &handle, s);
	cout << TF_Message(s);
 
	// Feed A and fetch A + 2.
	TF_Output feeds1[] = { TF_Output { a, 0 } };
	TF_Output fetches1[] = { TF_Output { plus2, 0 } };
	TF_Tensor* feedValues1[] = { Int32Tensor(1) };
	TF_Tensor* fetchValues1[1];
	TF_SessionPRun(sess, handle, feeds1, feedValues1, 1, fetches1, fetchValues1,
			1, NULL, 0, s);
	cout << TF_Message(s);
	cout << *(static_cast<int*>(TF_TensorData(fetchValues1[0]))) << endl;
 
	// Clean up.
	TF_DeletePRunHandle(handle);
	TF_DeleteSession(sess, s);
	cout << TF_Message(s);
	TF_DeleteGraph(graph);
	TF_DeleteStatus(s);
	return 0;
}

根据用户程序生成计算图,并优化图:

//code in graph_execution_state.cc
Status GraphExecutionState::BuildGraph(const BuildGraphOptions& options,
                                       std::unique_ptr<ClientGraph>* out) {
  VLOG(1) << "BuildGraph";
  if (!graph_) {
    // It is only valid to call this method directly when the original graph
    // was created with the option `place_pruned_graph == false`.
    return errors::Internal(
        "Attempted to prune a graph that has not been fully initialized.");
  }
 
  // Grappler optimization might change the structure of a graph itself, and
  // also it can add/prune functions to/from the library.
  std::unique_ptr<Graph> optimized_graph;
  std::unique_ptr<FunctionLibraryDefinition> optimized_flib;
//调用OptimizeGraph优化图
  Status s = OptimizeGraph(options, &optimized_graph, &optimized_flib);
  if (!s.ok()) {
    VLOG(2) << "Grappler optimization failed. Error: " << s.error_message();
    // Simply copy the original graph and the function library if we couldn't
    // optimize it.
    optimized_graph.reset(new Graph(flib_def_.get()));
    CopyGraph(*graph_, optimized_graph.get());
    optimized_flib.reset(new FunctionLibraryDefinition(*flib_def_));
  }
......
}

接着调用RunMetaOptimizer进行图优化:

//code in graph_execution_state.cc
Status GraphExecutionState::OptimizeGraph(
    const BuildGraphOptions& options, std::unique_ptr<Graph>* optimized_graph,
    std::unique_ptr<FunctionLibraryDefinition>* optimized_flib) {
#ifndef IS_MOBILE_PLATFORM
  if (session_options_->config.graph_options().place_pruned_graph()) {
    return errors::InvalidArgument("Can't optimize a pruned graph");
  }
 
  if (grappler::MetaOptimizerEnabled(session_options_->config)) {
    grappler::GrapplerItem item;
    item.id = "tf_graph";
    graph_->ToGraphDef(&item.graph);
 
   ......
 
    Device* cpu_device = nullptr;
    for (const auto& device : device_set_->devices()) {
      if (device->parsed_name().id == 0 &&
          StringPiece(device->parsed_name().type) == "CPU" &&
          device->GetAllocator(AllocatorAttributes()) != nullptr) {
        cpu_device = device;
      }
    }
    grappler::VirtualCluster cluster(device_set_);
    GraphDef new_graph;
    TF_RETURN_IF_ERROR(grappler::RunMetaOptimizer(
        item, session_options_->config, cpu_device, &cluster, &new_graph));
 
......
}

函数调用栈:RunMetaOptimizer->Optimize->OptimizeGraph。
在方法OptimizeGraph里面,通过InitializeOptimizers获取所有的优化器optimizers,然后根据配置的迭代次数,用每个optimizer进行优化:optimizer->Optimize。

//code in meta_optimizer.cc
Status RunMetaOptimizer(const GrapplerItem& item, const ConfigProto& cfg,
                        DeviceBase* cpu_device, Cluster* cluster,
                        GraphDef* optimized_graph) {
  MetaOptimizer optimizer(cpu_device, cfg);
  optimizer.set_deadline_usec(
      DeadlineMicroSeconds(cfg.graph_options().rewrite_options()));
  Status status = optimizer.Optimize(cluster, item, optimized_graph);
  if (!status.ok()) {
    *optimized_graph = item.graph;
  }
  return status;
}
 
Status MetaOptimizer::Optimize(Cluster* cluster, const GrapplerItem& item,
                               GraphDef* optimized_graph) {
  VLOG(1) << "Starting optimization for grappler item: " << item.id;
......
 
  // 1. Optimize main graph
  TF_RETURN_IF_ERROR(OptimizeGraph(cluster, trimmed_item, optimized_graph));
  VLOG(1) << "Optimized main graph.";
  GRAPPLER_RETURN_IF_DEADLINE_EXCEEDED();
 
......
 
  // 2. Optimize functions reachable from the optimized graph.
  FunctionLibraryDefinition flib = minimized_flib(*optimized_graph);
 
  // Find functions for which we might need to compute a gradient at runtime.
  absl::flat_hash_set<string> differentiable_functions;
  for (const NodeDef& node : optimized_graph->node()) {
    if (IsSymbolicGradient(node)) {
      const auto* f_attr = gtl::FindOrNull(node.attr(), "f");
      if (f_attr) differentiable_functions.insert(f_attr->func().name());
    }
  }
......
}
 
Status MetaOptimizer::OptimizeGraph(Cluster* cluster, const GrapplerItem& item,
                                    GraphDef* optimized_graph) {
......
  std::vector<std::unique_ptr<GraphOptimizer>> optimizers;
  if (cfg_.optimizers().empty()) {
    TF_RETURN_IF_ERROR(InitializeOptimizers(&optimizers));
  } else {
    TF_RETURN_IF_ERROR(InitializeOptimizersByName(&optimizers));
  }
......
  for (int iteration = 0; iteration < NumIterations(cfg_); ++iteration) {
......
    for (const auto& optimizer : optimizers) {
......
      RUN_OPTIMIZER_OR_RETURN_IF_ERROR(optimizer.get());
    }
......
  }
 
Status MetaOptimizer::RunOptimizer(
    GraphOptimizer* optimizer, Cluster* cluster, GrapplerItem* optimized_item,
    GraphDef* optimized_graph, GraphOptimizationResult* optimization_result) {
......
  Status status =
      optimizer->Optimize(cluster, *optimized_item, optimized_graph);
  uint64 end_us = Env::Default()->NowMicros();
......
}

目前tensoflow实现的优化器有:

tensorflow::grappler::GraphOptimizer
	tensorflow::grappler::ArithmeticOptimizer
	tensorflow::grappler::AutoParallel
	tensorflow::grappler::ConstantFolding
	tensorflow::grappler::CustomGraphOptimizer
	tensorflow::grappler::DebugStripper
	tensorflow::grappler::DependencyOptimizer
	tensorflow::grappler::FunctionOptimizer
	tensorflow::grappler::LayoutOptimizer
	tensorflow::grappler::LoopOptimizer
	tensorflow::grappler::MemoryOptimizer
	tensorflow::grappler::MetaOptimizer
	tensorflow::grappler::ModelPruner
	tensorflow::grappler::PinToHostOptimizer
	tensorflow::grappler::Remapper
	tensorflow::grappler::ScopedAllocatorOptimizer
	tensorflow::grappler::ShapeOptimizer

优化后的图为:
n2为常量2 Const Operation;
n3为tensor A,_Recv Operation;
n4为AddN Operation,负责把n3和n2加起来;
n7为_Send Operation, 把结果输出。

2019-07-06 03:42:13.359152: I tensorflow/core/common_runtime/direct_session.cc:1592] Created 
() -> () {
  n5 = _Recv[client_terminated=true, recv_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device_incarnation=2398593294780023302, tensor_name="B:0", tensor_type=int32, device=CPU:0]()
  n2 = Const[dtype=int32, value=Tensor<type: int32 shape: [] values: 2>, device=CPU:0]()
  n3 = _Recv[client_terminated=true, recv_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device_incarnation=2398593294780023302, tensor_name="A:0", tensor_type=int32, device=CPU:0]()
  n4 = AddN[N=2, T=int32, device=CPU:0](n3, n2)
  n6 = AddN[N=2, T=int32, device=CPU:0](n4, n5)
  n8 = _Send[T=int32, client_terminated=true, recv_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device_incarnation=2398593294780023302, tensor_name="plusB:0", device=CPU:0](n6)
  n7 = _Send[T=int32, client_terminated=true, recv_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device_incarnation=2398593294780023302, tensor_name="plus2:0", device=CPU:0](n4)
}
 for /job:localhost/replica:0/task:0/device:CPU:0

本作品采用知识共享署名 4.0 国际许可协议进行许可。

发表回复