如何从SFDC拉取大量数据

Database and Ruby, Python, History


通常从 Salesforce 拉取数据,我们第一反应就是用restforce包去搞定所有的事情,但是restforce有它的局限性:

  1. 不适合请求大量数据。restforce 只支持同步操作,意味着你要一直等待,直到结果返回。当数据量比较小的时候,没有什么问题,但是当请求数据量大的时候,可能会很慢很慢。
  2. 对异常编码的处理。restforce 会对请求进行 json 转换,而 SFDC 那边可能会有一些 UTF-16 编码的数据少了一半,导致解析失败而报错。

当然还有(salesforcebulk)[https://rubygems.org/gems/salesforcebulk]这样的包,但是这个 Package 是基于 Salesforce Bulk API 1.0,而在 Salesforce Bulk API 2.0 推出之后就没有进行更新了。

所以,要从 SFDC 拉取大量数据,需要以下几点:

  1. 使用 Salesforce Bulk API 2.0。使用上 2.0 会更加简单些,省去了区分 batch 和 job。
  2. 能够 handle encoding error。

salesforcebulk能够解决上面的第二点,第一点其实也不是特别必须,但有一点是新方法和salesforcebulk都需要处理的,就是返回数据中间的换行符。 Salesforce Bulk API 无论是 1.0 还是 2.0,返回的数据格式都是text/csv,意味着一旦数据中有换行符,就会导致 csv 解析失败。

那么就开干吧!

首先是authentication。 Salesforce 支持 OAUTH2 认证,通过password模式,换取临时token

module SalesforceBulk2
  class Client
    def initialize
    end

    def authenticate
      url = URI("https://#{@instance_url}/services/oauth2/token")
      https = Net::HTTP.new(url.host, url.port)
      https.use_ssl = true

      request = Net::HTTP::Post.new(url)
      request["Content-Type"] = "application/x-www-form-urlencoded"

      request.body = "grant_type=password"
      request.body += "&client_id=#{ENV['SALESFORCE_CLIENT_ID']}"
      request.body += "&client_secret=#{ENV['SALESFORCE_CLIENT_SECRET']}"
      request.body += "&username=#{ENV['SALESFORCE_USERNAME']}&password=#{ENV['SALESFORCE_PASSWORD'] + ENV['SALESFORCE_SECURITY_TOKEN']}"

      response = https.request(request)
      @token = JSON.parse(response.body)['access_token']
    end
end

接下来就是添加Job和创建query.

module SalesforcBulk2
  class Client

    def add_job(soql)
      url = URI("https://#{@instance_url}/services/data/v58.0/jobs/query")

      https = Net::HTTP.new(url.host, url.port)
      https.use_ssl = true

      request = Net::HTTP::Post.new(url)
      request["X-PrettyPrint"] = "1"
      request["Content-Type"] = "application/json"
      request["Authorization"] = "Bearer #{@token}"

      request.body = JSON.dump({
        "operation": "query",
        "query": soql
      })

      response = https.request(request)
      JSON.parse(response.body)
    end
  end
end

创建好任务之后,就是轮询去查询 Job 的状态,直到 JobComplete

module SalesforcBulk2
  class Client
    def job_completed?(job)
      job_id = job['id']
      loop do
        puts "Waiting for #{job_id}"

        url = URI("https://#{@instance_url}/services/data/v58.0/jobs/query/#{job_id}")

        https = Net::HTTP.new(url.host, url.port)
        https.use_ssl = true

        request = Net::HTTP::Get.new(url)
        request["X-PrettyPrint"] = "1"
        request["Authorization"] = "Bearer #{@token}"

        response = https.request(request)
        return true if JSON.parse(response.body)['state'] == 'JobComplete'

        puts "job status: #{JSON.parse(response.body)['state']}"

        sleep 10
      end
    end
  end
end

再获取 Job 的查询结果。


    def query_result(job)
      job_id = job['id']
      url = URI("https://#{@instance_url}/services/data/v58.0/jobs/query/#{job_id}/results")

      https = Net::HTTP.new(url.host, url.port)
      https.use_ssl = true

      request = Net::HTTP::Get.new(url)
      request["X-PrettyPrint"] = "1"
      request["Authorization"] = "Bearer #{@token}"

      response = https.request(request)
      lines = response.body.gsub(/(?<!")\r?\n/, ' ').lines.to_a

      headers = CSV.parse_line(lines.shift).collect { |header| header.to_sym }

      result = []

      CSV.parse(lines.join, headers: headers, liberal_parsing: true) do |row|
        result << Hash[row.headers.zip(row.fields)]
      end

      result
    end

而上面的代码针对CSV中的换行符做了特别的处理,将\r\n\n转换为空格,这样他们不再拆分成新的一行。

lines = response.body.gsub(/(?<!")\r?\n/, ' ').lines.to_a

使用上,和restforce差别不大。

sf_bulk_client = SalesforceBulk2::Client.new.tap(&:authenticate)

job = sf_bulk_client.add_job(soql)
raw_rows = nil
if sf_bulk_client.job_completed?(job)
  raw_rows = sf_bulk_client.query_result(job)
  puts "completed - #{raw_rows.size}"
end