The Wikimedia Enterprise Realtime API, our firehose stream of events, is an essential tool for engineering teams that require access to real-time changes in Wikimedia projects.
With over one million daily events across all supported Wikimedia projects, including over 150,000 daily events for the English Wikipedia alone, the necessity for robust data handling capabilities is clear. In response, we’re introducing two significant features to the Realtime API: Parallel Connections and Restart support.
These enhancements are designed to improve data throughput and ensure seamless data ingestion, even if a network interruption occurs. To support these enhanced features, we’ve introduced three new query parameters along with three new fields in the response event object. Using these will unlock the new capabilities of Realtime API.
New Query Parameters
parts
(array of integers): Target subsets of partitions for each connection.offsets
(map of string:int): Start from specific offsets for each partition.since_per_partition
(map of string:string date in RFC3339): Start from specific timestamps for each partition.
All query parameters, including preexisting since, are referenced in Article Streaming docs.
New Response Fields
event.partition
(int): The partition (50 in total) this event belongs to.event.offset
(int): This event’s offset in the partition it belongs to.event.date_published
(string date in RFC3339): Timestamp when this event was published to the partition it belongs to.
The full response event object example schema is updated and shown in data dictionary.
How to create Parallel Connections
In order to connect to the Realtime API with more than one connection you’ll use one of the newly introduced query parameters called parts. This parameter allows you to target subsets of partitions in parallel connections. The maximum allowable parallel connections is 10, with each handling a range of parts 0 through 9. Each of these parts represent 1/10th of the subsequent partition. It’s crucial when using parts to cover all partitions (0-49) across these connections, as omitting any part in the connections will result in missing events from those partitions. The flexibility of the parts parameter enables various configurations, from distributing partitions evenly across multiple connections to targeting all partitions within a single connection.
Example Scenarios:
Ten Parallel Connections: To cover all partitions, configure each connection to handle a specific subset.
- Connection 1:
"parts": [0]
(Partitions 0-4) - Connection 2:
"parts": [1]
(Partitions 5-9) - Connection 3:
"parts": [2]
(Partitions 10-14) - …and so on up to the 10th Connection.
Two Parallel Connections: Combining multiple partitions in fewer connections.
- Connection 1:
"parts": [0, 1, 2, 3, 4]
(Partitions 0-24) - Connection 2:
"parts": [5, 6, 7, 8, 9]
(Partitions 25-49)
Single Connection: Targeting all partitions in one connection (default when not using parts).
- Connection 1:
"parts": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
(Partitions 0-49)
How to Reconnect to Realtime API with Restart Support
If a connection interruption occurs, the Wikimedia Enterprise Realtime API now supports a Restart feature that ensures data streaming can recover smoothly without missing events. Each article emitted by the API includes an event object with the existing since field alongside three new fields, all useful for reconnecting. You’ll want to maintain a map of the last consumed since, partition offset, and partition date_published to reconnect without data loss. We retain events for a period of 48 hours so you have enough time to obtain data missed during the disruption.
Examples:
The offsets parameter indicates the last offset from which to start consuming data for a specific partition.
{
"parts": [0],
"offsets": {"0": 3614782, "4": 3593806, "5": 3588693}
}
The since parameter indicates the last timestamp from which to start consuming data for a specific partition.
{
"parts": [0],
"since": {
"0": "2024-02-29T19:41:29.98Z",
"1": "2024-02-29T118:41:29.98Z"
}
}
The since_per_partition parameter enables starting data consumption from a timestamp for a specific partition. While this may be useful for your applications we don’t recommend this reconnection method due to potential performance issues.
{
"parts": [0],
"since_per_partition": {
"1": "2023-06-05T12:00:00Z",
"5": "2023-06-05T12:00:00Z"
}
}
You can also combine both parameters in the same request if needed.
{
"parts": [0],
"since_per_partition": {"1": "2023-06-05T12:00:00Z"},
"offsets": {"0": 3614782, "4": 3593806}
}
How to do Parallel Connections and Restart using our Go SDK
We have already outlined the new query parameters and fields we introduced along with some example scenarios. While that’s more than enough to get started, we thought it would be useful to demonstrate how, using our Go SDK, you would keep your Realtime API connections up to date and fault-tolerant.
package main
import (
"context"
"log"
"os"
"github.com/wikimedia-enterprise/wme-sdk-go/pkg/api"
"github.com/wikimedia-enterprise/wme-sdk-go/pkg/auth"
)
func main() {
// initialize the context and get the access token
context := context.Background()
authClient := auth.NewClient()
// login to the API using the username and password
loginResponse, err := authClient.Login(context, &auth.LoginRequest{
Username: os.Getenv("WME_USERNAME"),
Password: os.Getenv("WME_PASSWORD"),
})
if err != nil {
log.Fatalln(err)
}
// revoke the token after we are done
defer authClient.RevokeToken(context, &auth.RevokeTokenRequest{
RefreshToken: loginResponse.RefreshToken,
})
// initialize the API client and set the access token
apiClient := api.NewClient()
apiClient.SetAccessToken(loginResponse.AccessToken)
// create a request to get the Realtime API endpoint
// we limit the request to only three fields and filter by language
// also, we connect only to the first two parts of the stream
// if you have another consumer you can just use this code to connect to the next set of parts ({5, 6, 7, 8, 9})
// important part is to include the `event` field in the request
apiRequest := &api.Request{
Parts: []int{0, 1, 2, 3, 4},
Fields: []string{"name", "abstract", "event.*"},
Filters: []*api.Filter{
{
Field: "in_language.identifier",
Value: "en",
},
},
}
// list of offsets for each partition
// this needs to stored in persistent storage and used to reconnect
// there's a paramter called Offsets in the api.Request object
offsets := map[int]int64{}
// create a callback to read the articles
// will be called every time you get a new article from the stream
apiReadCallback := func(art *api.Article) error {
println(art.Name)
offsets[*art.Event.Partition] = *art.Event.Offset
log.Println(offsets)
log.Println("----------------------------------")
return nil
}
// connect to the Realtime API and stream the articles
// this is a blocking call
err = apiClient.StreamArticles(context, apiRequest, apiReadCallback)
if err != nil {
log.Fatalln(err)
}
}
The code above demonstrates how to extract the data needed to make your connection fault-tolerant using offsets of the messages you have consumed. To run it, you’ll need to have Go installed on your machine and have set WME_USERNAME
and WME_PASSWORD
from your Wikimedia Enterprise credentials.
As mentioned in the code comments, you would store those offsets somewhere in persistent storage and use them each time you reconnect to the stream to resume consumption exactly where you left off.
For more extensive examples and technical details, please see our SDKs (Go and Python) for more information.
Wrap Up
The introduction of Parallel Connections and Restart support in the Realtime API addresses the critical need for efficient and reliable data handling at scale.
To gain access to the Realtime API, please contact our team. If you’re already a customer, reach out to us with feedback (from within your account dashboard). We are committed to continually improving our services and look forward to supporting your projects with these new enhancements.
— Chuck Reynolds, Staff Product Manager, Growth