通常从 Salesforce 拉取数据,我们第一反应就是用restforce
包去搞定所有的事情,但是restforce
有它的局限性:
- 不适合请求大量数据。restforce 只支持同步操作,意味着你要一直等待,直到结果返回。当数据量比较小的时候,没有什么问题,但是当请求数据量大的时候,可能会很慢很慢。
- 对异常编码的处理。restforce 会对请求进行 json 转换,而 SFDC 那边可能会有一些 UTF-16 编码的数据少了一半,导致解析失败而报错。
当然还有(salesforcebulk)[https://rubygems.org/gems/salesforcebulk]这样的包,但是这个 Package 是基于 Salesforce Bulk API 1.0,而在 Salesforce Bulk API 2.0 推出之后就没有进行更新了。
所以,要从 SFDC 拉取大量数据,需要以下几点:
- 使用 Salesforce Bulk API 2.0。使用上 2.0 会更加简单些,省去了区分 batch 和 job。
- 能够 handle encoding error。
salesforcebulk
能够解决上面的第二点,第一点其实也不是特别必须,但有一点是新方法和salesforcebulk
都需要处理的,就是返回数据中间的换行符。 Salesforce Bulk API 无论是 1.0 还是 2.0,返回的数据格式都是text/csv
,意味着一旦数据中有换行符,就会导致 csv 解析失败。
那么就开干吧!
首先是authentication
。 Salesforce 支持 OAUTH2 认证,通过password
模式,换取临时token
。
module SalesforceBulk2
class Client
def initialize
end
def authenticate
url = URI("https://#{@instance_url}/services/oauth2/token")
https = Net::HTTP.new(url.host, url.port)
https.use_ssl = true
request = Net::HTTP::Post.new(url)
request["Content-Type"] = "application/x-www-form-urlencoded"
request.body = "grant_type=password"
request.body += "&client_id=#{ENV['SALESFORCE_CLIENT_ID']}"
request.body += "&client_secret=#{ENV['SALESFORCE_CLIENT_SECRET']}"
request.body += "&username=#{ENV['SALESFORCE_USERNAME']}&password=#{ENV['SALESFORCE_PASSWORD'] + ENV['SALESFORCE_SECURITY_TOKEN']}"
response = https.request(request)
@token = JSON.parse(response.body)['access_token']
end
end
接下来就是添加Job
和创建query
.
module SalesforcBulk2
class Client
def add_job(soql)
url = URI("https://#{@instance_url}/services/data/v58.0/jobs/query")
https = Net::HTTP.new(url.host, url.port)
https.use_ssl = true
request = Net::HTTP::Post.new(url)
request["X-PrettyPrint"] = "1"
request["Content-Type"] = "application/json"
request["Authorization"] = "Bearer #{@token}"
request.body = JSON.dump({
"operation": "query",
"query": soql
})
response = https.request(request)
JSON.parse(response.body)
end
end
end
创建好任务之后,就是轮询去查询 Job 的状态,直到 JobComplete
module SalesforcBulk2
class Client
def job_completed?(job)
job_id = job['id']
loop do
puts "Waiting for #{job_id}"
url = URI("https://#{@instance_url}/services/data/v58.0/jobs/query/#{job_id}")
https = Net::HTTP.new(url.host, url.port)
https.use_ssl = true
request = Net::HTTP::Get.new(url)
request["X-PrettyPrint"] = "1"
request["Authorization"] = "Bearer #{@token}"
response = https.request(request)
return true if JSON.parse(response.body)['state'] == 'JobComplete'
puts "job status: #{JSON.parse(response.body)['state']}"
sleep 10
end
end
end
end
再获取 Job 的查询结果。
def query_result(job)
job_id = job['id']
url = URI("https://#{@instance_url}/services/data/v58.0/jobs/query/#{job_id}/results")
https = Net::HTTP.new(url.host, url.port)
https.use_ssl = true
request = Net::HTTP::Get.new(url)
request["X-PrettyPrint"] = "1"
request["Authorization"] = "Bearer #{@token}"
response = https.request(request)
lines = response.body.gsub(/(?<!")\r?\n/, ' ').lines.to_a
headers = CSV.parse_line(lines.shift).collect { |header| header.to_sym }
result = []
CSV.parse(lines.join, headers: headers, liberal_parsing: true) do |row|
result << Hash[row.headers.zip(row.fields)]
end
result
end
而上面的代码针对CSV
中的换行符做了特别的处理,将\r\n
和\n
转换为空格,这样他们不再拆分成新的一行。
lines = response.body.gsub(/(?<!")\r?\n/, ' ').lines.to_a
使用上,和restforce
差别不大。
sf_bulk_client = SalesforceBulk2::Client.new.tap(&:authenticate)
job = sf_bulk_client.add_job(soql)
raw_rows = nil
if sf_bulk_client.job_completed?(job)
raw_rows = sf_bulk_client.query_result(job)
puts "completed - #{raw_rows.size}"
end