Hướng dẫn schema
Hướng dẫn schemaBài 28: Cập nhật tập dữ liệu lớn

Bài 28: Cập nhật tập dữ liệu lớn

Đôi khi chúng ta cần cập nhật hàng nghìn tài nguyên trong một thao tác duy nhất, như được diễn đạt trong bình luận sau đây (được đăng trong một nhóm cộng đồng về WordPress):

Tôi thấy rằng với nhiều khách hàng, tôi đang làm việc với các tập dữ liệu lớn (hơn 10.000 biến thể sản phẩm cho 1 sản phẩm, hoặc hơn 13.000 tệp phương tiện) ... không thể tránh khỏi việc các khách hàng muốn có thể chỉnh sửa hàng loạt nhiều thứ cùng lúc - chẳng hạn như gắn thẻ 2.000 tệp phương tiện với cùng một thẻ.

Trong bài học hướng dẫn này, chúng ta sẽ khám phá các cách giải quyết nhiệm vụ này.

Nested Mutations

Để queries GraphQL này hoạt động, Cấu hình Schema được áp dụng cho endpoint cần bật Nested Mutations

Nhờ vào Nested Mutations, chúng ta có thể truy xuất và cập nhật hàng nghìn tài nguyên từ DB thông qua một queries GraphQL duy nhất:

mutation ReplaceOldWithNewDomainInPosts {
  posts(pagination: { limit: 3000 }) {
    id
    rawContent
    adaptedRawContent: _strReplace(
      search: "https://my-old-domain.com"
      replaceWith: "https://my-new-domain.com"
      in: $__rawContent
    )
    update(input: {
      contentAs: { html: $__adaptedRawContent }
    }) {
      status
      errors {
        __typename
        ...on ErrorPayload {
          message
        }
      }
    }
  }
}

Tuy nhiên, tùy thuộc vào khả năng chịu tải của hệ thống, một lần thực thi GraphQL duy nhất này có thể gây quá tải cho DB, thậm chí làm nó bị sập.

Phân trang việc thực thi queries GraphQL

Nếu việc cập nhật hàng nghìn tài nguyên cùng một lúc làm hệ thống bị sập, giải pháp rất đơn giản: thay vì thực thi GraphQL một lần duy nhất cho hàng nghìn tài nguyên, chúng ta có thể thực thi nó hàng trăm lần, mỗi lần cho vài chục tài nguyên.

Các đoạn script bash sau đây trước tiên tìm ra tổng số bình luận thông qua commentCount, sau đó tính toán các phân đoạn dựa trên biến môi trường $ENTRIES_TO_PROCESS, tính toán các tham số phân trang và gọi queries GraphQL cho từng phân đoạn (chỉ đơn giản là truy xuất các bình luận từ phân đoạn đó):

# Get the number of comments in the site
GRAPHQL_RESPONSE=$(curl
  -X POST \
  -H "Content-Type: application/json" \
  -d '{"query": "{\n  commentCount\n}"}' \
  https://mysite.com/graphql/)
 
# Extract the number of comments into a variable
COMMENT_COUNT=$(echo $GRAPHQL_RESPONSE \
  | grep -E -o '"commentCount\":([0-9]+)' \
  | cut -d':' -f2-)
 
echo "Number of comments: $COMMENT_COUNT"
 
# How many entries will be processed on each query
ENTRIES_TO_PROCESS=10
 
# Calculate how many requests must be triggered
PAGINATION_COUNT=$(($(($COMMENT_COUNT / $ENTRIES_TO_PROCESS)) + $(($(($COMMENT_COUNT % $ENTRIES_TO_PROCESS)) ? 1 : 0))))
 
echo "Number of requests to process (at $ENTRIES_TO_PROCESS entries per request): $PAGINATION_COUNT"
 
# Execute the requests, at one per second
for PAGINATION_NUMBER in $(seq 0 $(($PAGINATION_COUNT - 1))); do sleep 1 && echo "\n\nPagination number: $PAGINATION_NUMBER\n" && curl -X POST -H "Content-Type: application/json" -d "{\"query\": \"{ comments(pagination: { limit: $ENTRIES_TO_PROCESS, offset: $(($PAGINATION_NUMBER * $ENTRIES_TO_PROCESS)) }) { id date content } }\"}" https://mysite.com/graphql/ ; done

Thực thi queries GraphQL theo cách đệ quy

Vì giải pháp trên liên quan đến bash scripting, nó phải được thực thi qua CLI (hoặc một số bảng quản trị hay công cụ), điều này giới hạn khả năng sử dụng của nó.

Chúng ta có thể tái tạo cùng logic đó ngay vào bản thân queries GraphQL, cho phép chúng ta thực thi nó ngay trong WordPress (thậm chí có thể lưu trữ nó dưới dạng GraphQL Persisted Query).

Queries GraphQL dưới đây tự thực thi theo cách đệ quy. Khi được gọi lần đầu tiên, nó:

  • Chia tổng số tài nguyên cần cập nhật thành các phân đoạn (được tính toán bằng biến $limit được cung cấp)
  • Tự thực thi thông qua một yêu cầu HTTP mới cho từng phân đoạn (truyền $offset tương ứng như một biến), do đó chỉ cập nhật một tập hợp con của tất cả tài nguyên tại một thời điểm nhất định

Queries GraphQL là đệ quy bằng cách để các yêu cầu HTTP trỏ đến cùng URL với yêu cầu hiện tại (cộng thêm biến $offset cho phân đoạn đó), trong đó chúng ta truy xuất URL (và cả phần body, phương thức và headers) từ yêu cầu HTTP hiện tại (thông qua extension HTTP Request via Schema).

Đối số $async được truyền cho _sendHTTPRequests đã được đặt thành false, để các yêu cầu HTTP sẽ được thực thi lần lượt. Ngoài ra, biến tùy chọn $delay cho phép chỉ định số mili giây cần trì hoãn trước khi gửi mỗi yêu cầu.

Khi tất cả tài nguyên đã được cập nhật, việc thực thi queries GraphQL đạt đến điểm kết thúc và dừng lại:

# When first invoked, we do not pass variable `$offset`
# Then `$offset` is `null`, and dynamic variable `$executeQuery` will be `true`
query ExportExecute(
  $offset: Int
) {
  executeQuery: _notNull(value: $offset)
    @export(as: "executeQuery")
    @remove # Comment this directive to visualize output during development
}
 
# Only calculate the segments on the first invocation of the GraphQL query
query CalculateVars($limit: Int! = 10)
  @depends(on: "ExportExecute")
  @skip(if: $executeQuery)
{
  # Calculate the number of HTTP requests to be sent
  commentCount
  fractionalNumberExecutions: _floatDivide(number: $__commentCount, by: $limit)
    @remove # Comment this directive to visualize output during development
  numberExecutions: _floatCeil(number: $__fractionalNumberExecutions)
  
  # Generate a list of the offset
  arrayOffsets: _arrayPad(array: [], length: $__numberExecutions, value: null)
    @underEachArrayItem(
      passIndexOnwardsAs: "position"
    )
      @applyField(
        name: "_intMultiply"
        arguments: {
          multiply: $position
          with: $limit
        }
        setResultInResponse: true
      )
    @export(as: "offsets")
 
  # Vars needed to generate a list of the HTTP Request inputs,
  # with many of them retrieved from the current HTTP request data
  url: _httpRequestFullURL
    @export(as: "url")
    @remove # Comment this directive to visualize output during development
  method: _httpRequestMethod
    @export(as: "method")
    @remove # Comment this directive to visualize output during development
  headers: _httpRequestHeaders
    @remove # Comment this directive to visualize output during development
  headersInputList: _objectConvertToNameValueEntryList(
    object: $__headers
  )
    @export(as: "headersInputList")
    @remove # Comment this directive to visualize output during development
  body: _httpRequestBody
    @remove # Comment this directive to visualize output during development
  bodyJSONObject: _strDecodeJSONObject(string: $__body)
    @export(as: "bodyJSONObject")
    @remove # Comment this directive to visualize output during development
  bodyHasVariables: _propertyIsSetInJSONObject(
    object: $__bodyJSONObject,
    by: { key: "variables" }
  )
    @export(as: "bodyHasVariables")
    @remove # Comment this directive to visualize output during development
}
 
query GenerateVars
  @depends(on: ["ExportExecute", "CalculateVars"])
  @skip(if: $executeQuery)
{
  bodyJSON: _echo(value: $bodyJSONObject)
    @unless(condition: $bodyHasVariables)
      @objectAddEntry(
        key: "variables"
        value: {}
      )
    @export(as: "bodyJSON")
    @remove # Comment this directive to visualize output during development
}
 
# Generate all the HTTPRequestInput objects to send each of the HTTP requests
query GenerateRequestInputs(
  $timeout: Float,
  $delay: Int
)
  @depends(on: ["ExportExecute", "GenerateVars"])
  @skip(if: $executeQuery)
{
  # Generate a list of the HTTP Request inputs (without the offset)
  requestInputs: _echo(value: $offsets)
    @underEachArrayItem(
      passValueOnwardsAs: "requestOffset"
      affectDirectivesUnderPos: [1, 2]
    )
      @applyField(
        name: "_objectAddEntry",
        arguments: {
          object: $bodyJSON
          underPath: "variables"
          key: "offset"
          value: $requestOffset
        },
        passOnwardsAs: "itemJSON"
      )
      @applyField(
        name: "_echo",
        arguments: {
          value: {
            url: $url
            method: $method
            options: {
              headers: $headersInputList
              json: $itemJSON
              timeout: $timeout
              delay: $delay
            }
          }
        },
        setResultInResponse: true
      )
    @export(as: "requestInputs")
    @remove # Comment this directive to visualize output during development
}
 
# Execute all the generated URLs, either asynchronously or not
query ExecuteURLs
  @depends(on: ["ExportExecute", "GenerateRequestInputs"])
  @skip(if: $executeQuery)
{
  _sendHTTPRequests(
    async: false
    inputs: $requestInputs
  ) {
    statusCode
    contentType
    body
      @remove
    bodyJSON: _strDecodeJSONObject(string: $__body)
  }
}
 
# This is the actual execution of the query.
# In this case, it simply prints the time when it was executed,
# the provided query variables, and the comment IDs for that segment
query ExecuteQuery(
  $offset: Int
  $limit: Int! = 10
)
  @depends(on: "ExportExecute")
  @include(if: $executeQuery)
{
  executionTime: _httpRequestRequestTime
  queryVariables: _sprintf(string: "[$limit: %s, $offset: %s]", values: [$limit, $offset])
  comments(
    pagination: { limit: $limit, offset: $offset }
    sort: { order: ASC, by: ID }
  ) {
    id
  }
}
 
query ExecuteAll
  @depends(on: ["ExecuteURLs", "ExecuteQuery"])
{
  id
    @remove
}

Phản hồi là:

{
  "data": {
    "commentCount": 23,
    "numberExecutions": 3,
    "arrayOffsets": [
      0,
      10,
      20
    ],
    "_sendHTTPRequests": [
      {
        "statusCode": 200,
        "contentType": "application/json",
        "bodyJSON": {
          "data": {
            "executionTime": 1689814467,
            "queryVariables": "[$limit: 10, $offset: 0]",
            "comments": [
              {
                "id": 2
              },
              {
                "id": 3
              },
              {
                "id": 4
              },
              {
                "id": 5
              },
              {
                "id": 6
              },
              {
                "id": 7
              },
              {
                "id": 8
              },
              {
                "id": 9
              },
              {
                "id": 10
              },
              {
                "id": 11
              }
            ]
          }
        }
      },
      {
        "statusCode": 200,
        "contentType": "application/json",
        "bodyJSON": {
          "data": {
            "executionTime": 1689814468,
            "queryVariables": "[$limit: 10, $offset: 10]",
            "comments": [
              {
                "id": 12
              },
              {
                "id": 13
              },
              {
                "id": 16
              },
              {
                "id": 17
              },
              {
                "id": 18
              },
              {
                "id": 19
              },
              {
                "id": 20
              },
              {
                "id": 21
              },
              {
                "id": 22
              },
              {
                "id": 23
              }
            ]
          }
        }
      },
      {
        "statusCode": 200,
        "contentType": "application/json",
        "bodyJSON": {
          "data": {
            "executionTime": 1689814470,
            "queryVariables": "[$limit: 10, $offset: 20]",
            "comments": [
              {
                "id": 24
              },
              {
                "id": 25
              },
              {
                "id": 26
              }
            ]
          }
        }
      }
    ]
  }
}