Bài 28: Cập nhật tập dữ liệu lớn
Đôi khi chúng ta cần cập nhật hàng nghìn tài nguyên trong một thao tác duy nhất, như được diễn đạt trong bình luận sau đây (được đăng trong một nhóm cộng đồng về WordPress):
Tôi thấy rằng với nhiều khách hàng, tôi đang làm việc với các tập dữ liệu lớn (hơn 10.000 biến thể sản phẩm cho 1 sản phẩm, hoặc hơn 13.000 tệp phương tiện) ... không thể tránh khỏi việc các khách hàng muốn có thể chỉnh sửa hàng loạt nhiều thứ cùng lúc - chẳng hạn như gắn thẻ 2.000 tệp phương tiện với cùng một thẻ.
Trong bài học hướng dẫn này, chúng ta sẽ khám phá các cách giải quyết nhiệm vụ này.
Nested Mutations
Để queries GraphQL này hoạt động, Cấu hình Schema được áp dụng cho endpoint cần bật Nested Mutations
Nhờ vào Nested Mutations, chúng ta có thể truy xuất và cập nhật hàng nghìn tài nguyên từ DB thông qua một queries GraphQL duy nhất:
mutation ReplaceOldWithNewDomainInPosts {
posts(pagination: { limit: 3000 }) {
id
rawContent
adaptedRawContent: _strReplace(
search: "https://my-old-domain.com"
replaceWith: "https://my-new-domain.com"
in: $__rawContent
)
update(input: {
contentAs: { html: $__adaptedRawContent }
}) {
status
errors {
__typename
...on ErrorPayload {
message
}
}
}
}
}Tuy nhiên, tùy thuộc vào khả năng chịu tải của hệ thống, một lần thực thi GraphQL duy nhất này có thể gây quá tải cho DB, thậm chí làm nó bị sập.
Phân trang việc thực thi queries GraphQL
Nếu việc cập nhật hàng nghìn tài nguyên cùng một lúc làm hệ thống bị sập, giải pháp rất đơn giản: thay vì thực thi GraphQL một lần duy nhất cho hàng nghìn tài nguyên, chúng ta có thể thực thi nó hàng trăm lần, mỗi lần cho vài chục tài nguyên.
Các đoạn script bash sau đây trước tiên tìm ra tổng số bình luận thông qua commentCount, sau đó tính toán các phân đoạn dựa trên biến môi trường $ENTRIES_TO_PROCESS, tính toán các tham số phân trang và gọi queries GraphQL cho từng phân đoạn (chỉ đơn giản là truy xuất các bình luận từ phân đoạn đó):
# Get the number of comments in the site
GRAPHQL_RESPONSE=$(curl
-X POST \
-H "Content-Type: application/json" \
-d '{"query": "{\n commentCount\n}"}' \
https://mysite.com/graphql/)
# Extract the number of comments into a variable
COMMENT_COUNT=$(echo $GRAPHQL_RESPONSE \
| grep -E -o '"commentCount\":([0-9]+)' \
| cut -d':' -f2-)
echo "Number of comments: $COMMENT_COUNT"
# How many entries will be processed on each query
ENTRIES_TO_PROCESS=10
# Calculate how many requests must be triggered
PAGINATION_COUNT=$(($(($COMMENT_COUNT / $ENTRIES_TO_PROCESS)) + $(($(($COMMENT_COUNT % $ENTRIES_TO_PROCESS)) ? 1 : 0))))
echo "Number of requests to process (at $ENTRIES_TO_PROCESS entries per request): $PAGINATION_COUNT"
# Execute the requests, at one per second
for PAGINATION_NUMBER in $(seq 0 $(($PAGINATION_COUNT - 1))); do sleep 1 && echo "\n\nPagination number: $PAGINATION_NUMBER\n" && curl -X POST -H "Content-Type: application/json" -d "{\"query\": \"{ comments(pagination: { limit: $ENTRIES_TO_PROCESS, offset: $(($PAGINATION_NUMBER * $ENTRIES_TO_PROCESS)) }) { id date content } }\"}" https://mysite.com/graphql/ ; doneThực thi queries GraphQL theo cách đệ quy
Vì giải pháp trên liên quan đến bash scripting, nó phải được thực thi qua CLI (hoặc một số bảng quản trị hay công cụ), điều này giới hạn khả năng sử dụng của nó.
Chúng ta có thể tái tạo cùng logic đó ngay vào bản thân queries GraphQL, cho phép chúng ta thực thi nó ngay trong WordPress (thậm chí có thể lưu trữ nó dưới dạng GraphQL Persisted Query).
Queries GraphQL dưới đây tự thực thi theo cách đệ quy. Khi được gọi lần đầu tiên, nó:
- Chia tổng số tài nguyên cần cập nhật thành các phân đoạn (được tính toán bằng biến
$limitđược cung cấp) - Tự thực thi thông qua một yêu cầu HTTP mới cho từng phân đoạn (truyền
$offsettương ứng như một biến), do đó chỉ cập nhật một tập hợp con của tất cả tài nguyên tại một thời điểm nhất định
Queries GraphQL là đệ quy bằng cách để các yêu cầu HTTP trỏ đến cùng URL với yêu cầu hiện tại (cộng thêm biến $offset cho phân đoạn đó), trong đó chúng ta truy xuất URL (và cả phần body, phương thức và headers) từ yêu cầu HTTP hiện tại (thông qua extension HTTP Request via Schema).
Đối số $async được truyền cho _sendHTTPRequests đã được đặt thành false, để các yêu cầu HTTP sẽ được thực thi lần lượt. Ngoài ra, biến tùy chọn $delay cho phép chỉ định số mili giây cần trì hoãn trước khi gửi mỗi yêu cầu.
Khi tất cả tài nguyên đã được cập nhật, việc thực thi queries GraphQL đạt đến điểm kết thúc và dừng lại:
# When first invoked, we do not pass variable `$offset`
# Then `$offset` is `null`, and dynamic variable `$executeQuery` will be `true`
query ExportExecute(
$offset: Int
) {
executeQuery: _notNull(value: $offset)
@export(as: "executeQuery")
@remove # Comment this directive to visualize output during development
}
# Only calculate the segments on the first invocation of the GraphQL query
query CalculateVars($limit: Int! = 10)
@depends(on: "ExportExecute")
@skip(if: $executeQuery)
{
# Calculate the number of HTTP requests to be sent
commentCount
fractionalNumberExecutions: _floatDivide(number: $__commentCount, by: $limit)
@remove # Comment this directive to visualize output during development
numberExecutions: _floatCeil(number: $__fractionalNumberExecutions)
# Generate a list of the offset
arrayOffsets: _arrayPad(array: [], length: $__numberExecutions, value: null)
@underEachArrayItem(
passIndexOnwardsAs: "position"
)
@applyField(
name: "_intMultiply"
arguments: {
multiply: $position
with: $limit
}
setResultInResponse: true
)
@export(as: "offsets")
# Vars needed to generate a list of the HTTP Request inputs,
# with many of them retrieved from the current HTTP request data
url: _httpRequestFullURL
@export(as: "url")
@remove # Comment this directive to visualize output during development
method: _httpRequestMethod
@export(as: "method")
@remove # Comment this directive to visualize output during development
headers: _httpRequestHeaders
@remove # Comment this directive to visualize output during development
headersInputList: _objectConvertToNameValueEntryList(
object: $__headers
)
@export(as: "headersInputList")
@remove # Comment this directive to visualize output during development
body: _httpRequestBody
@remove # Comment this directive to visualize output during development
bodyJSONObject: _strDecodeJSONObject(string: $__body)
@export(as: "bodyJSONObject")
@remove # Comment this directive to visualize output during development
bodyHasVariables: _propertyIsSetInJSONObject(
object: $__bodyJSONObject,
by: { key: "variables" }
)
@export(as: "bodyHasVariables")
@remove # Comment this directive to visualize output during development
}
query GenerateVars
@depends(on: ["ExportExecute", "CalculateVars"])
@skip(if: $executeQuery)
{
bodyJSON: _echo(value: $bodyJSONObject)
@unless(condition: $bodyHasVariables)
@objectAddEntry(
key: "variables"
value: {}
)
@export(as: "bodyJSON")
@remove # Comment this directive to visualize output during development
}
# Generate all the HTTPRequestInput objects to send each of the HTTP requests
query GenerateRequestInputs(
$timeout: Float,
$delay: Int
)
@depends(on: ["ExportExecute", "GenerateVars"])
@skip(if: $executeQuery)
{
# Generate a list of the HTTP Request inputs (without the offset)
requestInputs: _echo(value: $offsets)
@underEachArrayItem(
passValueOnwardsAs: "requestOffset"
affectDirectivesUnderPos: [1, 2]
)
@applyField(
name: "_objectAddEntry",
arguments: {
object: $bodyJSON
underPath: "variables"
key: "offset"
value: $requestOffset
},
passOnwardsAs: "itemJSON"
)
@applyField(
name: "_echo",
arguments: {
value: {
url: $url
method: $method
options: {
headers: $headersInputList
json: $itemJSON
timeout: $timeout
delay: $delay
}
}
},
setResultInResponse: true
)
@export(as: "requestInputs")
@remove # Comment this directive to visualize output during development
}
# Execute all the generated URLs, either asynchronously or not
query ExecuteURLs
@depends(on: ["ExportExecute", "GenerateRequestInputs"])
@skip(if: $executeQuery)
{
_sendHTTPRequests(
async: false
inputs: $requestInputs
) {
statusCode
contentType
body
@remove
bodyJSON: _strDecodeJSONObject(string: $__body)
}
}
# This is the actual execution of the query.
# In this case, it simply prints the time when it was executed,
# the provided query variables, and the comment IDs for that segment
query ExecuteQuery(
$offset: Int
$limit: Int! = 10
)
@depends(on: "ExportExecute")
@include(if: $executeQuery)
{
executionTime: _httpRequestRequestTime
queryVariables: _sprintf(string: "[$limit: %s, $offset: %s]", values: [$limit, $offset])
comments(
pagination: { limit: $limit, offset: $offset }
sort: { order: ASC, by: ID }
) {
id
}
}
query ExecuteAll
@depends(on: ["ExecuteURLs", "ExecuteQuery"])
{
id
@remove
}Phản hồi là:
{
"data": {
"commentCount": 23,
"numberExecutions": 3,
"arrayOffsets": [
0,
10,
20
],
"_sendHTTPRequests": [
{
"statusCode": 200,
"contentType": "application/json",
"bodyJSON": {
"data": {
"executionTime": 1689814467,
"queryVariables": "[$limit: 10, $offset: 0]",
"comments": [
{
"id": 2
},
{
"id": 3
},
{
"id": 4
},
{
"id": 5
},
{
"id": 6
},
{
"id": 7
},
{
"id": 8
},
{
"id": 9
},
{
"id": 10
},
{
"id": 11
}
]
}
}
},
{
"statusCode": 200,
"contentType": "application/json",
"bodyJSON": {
"data": {
"executionTime": 1689814468,
"queryVariables": "[$limit: 10, $offset: 10]",
"comments": [
{
"id": 12
},
{
"id": 13
},
{
"id": 16
},
{
"id": 17
},
{
"id": 18
},
{
"id": 19
},
{
"id": 20
},
{
"id": 21
},
{
"id": 22
},
{
"id": 23
}
]
}
}
},
{
"statusCode": 200,
"contentType": "application/json",
"bodyJSON": {
"data": {
"executionTime": 1689814470,
"queryVariables": "[$limit: 10, $offset: 20]",
"comments": [
{
"id": 24
},
{
"id": 25
},
{
"id": 26
}
]
}
}
}
]
}
}