Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -227,33 +227,40 @@ extension SegmentDestination {
guard let httpClient = self.httpClient else { return }

let totalCount = storage.dataStore.count
var currentCount = 0

guard totalCount > 0 else { return }

while currentCount < totalCount {
// can't imagine why we wouldn't get data at this point, but if we don't, then split.
guard let eventData = storage.dataStore.fetch() else { return }
guard let data = eventData.data else { return }
guard let removable = eventData.removable else { return }
guard let dataCount = eventData.removable?.count else { return }
// Process events in flushAt-sized batches so each batch is independent,
// matching file mode behavior where each file gets its own upload.
// This prevents failed retry events from being merged with new events.
let batchSize = max(analytics.configuration.values.flushAt, 1)
var offset = 0

currentCount += dataCount
while offset < totalCount {
// can't imagine why we wouldn't get data at this point, but if we don't, then split.
guard let eventData = storage.dataStore.fetch(count: batchSize, offset: offset) else { break }
guard let data = eventData.data else { break }
guard let removable = eventData.removable else { break }
guard let dataCount = eventData.removable?.count else { break }

// Generate a stable batch identifier from the data content
let batchId = "mem-\(data.hashValue)"

// Check retry state machine before uploading
let decision = httpClient.checkBatchUpload(batchId: batchId)
switch decision {
case .skipAllBatches, .skipThisBatch:
// Backoff or rate limit in effect — skip this flush cycle
case .skipAllBatches:
// Rate limited globally — stop processing all batches
return
case .skipThisBatch:
// Backoff in effect for this batch — skip it, try the next
offset += dataCount
continue
case .dropBatch:
// Max retries or duration exceeded — drop the data
analytics.log(message: "Dropping batch \(batchId): retry limit exceeded")
analytics.reportInternalError(AnalyticsError.batchUploadFail(AnalyticsError.networkServerRejected(nil, 0)))
storage.remove(data: removable)
// Don't advance offset — removal shifted the array
continue
case .proceed:
break
Expand All @@ -266,6 +273,7 @@ extension SegmentDestination {
// we're already on a separate thread.
// lets let this task complete so we can get all the values out.
let semaphore = DispatchSemaphore(value: 0)
var didRemove = false

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data, batchId: batchId) { [weak self] result in
Expand All @@ -279,12 +287,14 @@ extension SegmentDestination {
switch result {
case .success(_):
storage.remove(data: removable)
didRemove = true
cleanupUploads()

// Non-retryable status codes should drop the batch
case .failure(Segment.HTTPClientErrors.statusCode(let code)):
if httpClient.shouldDropBatch(forStatusCode: code) {
storage.remove(data: removable)
didRemove = true
}
cleanupUploads()
default:
Expand All @@ -305,6 +315,12 @@ extension SegmentDestination {
}

_ = semaphore.wait(timeout: .distantFuture)

// If items were removed, the array shifted — offset stays.
// If items stayed (retryable failure), advance offset past them.
if !didRemove {
offset += dataCount
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Utilities/Storage/DataStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ public protocol DataStore {
init(configuration: StoreConfiguration)
func reset()
func append(data: RawEvent)
func fetch(count: Int?, maxBytes: Int?) -> DataResult?
func fetch(count: Int?, maxBytes: Int?, offset: Int) -> DataResult?
func remove(data: [ItemID])
}
4 changes: 2 additions & 2 deletions Sources/Segment/Utilities/Storage/TransientDB.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public class TransientDB {
}
}

public func fetch(count: Int? = nil, maxBytes: Int? = nil) -> DataResult? {
public func fetch(count: Int? = nil, maxBytes: Int? = nil, offset: Int = 0) -> DataResult? {
var result: DataResult? = nil
syncQueue.sync {
result = store.fetch(count: count, maxBytes: maxBytes)
result = store.fetch(count: count, maxBytes: maxBytes, offset: offset)
}
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class DirectoryStore: DataStore {
}
}

public func fetch(count: Int?, maxBytes: Int?) -> DataResult? {
public func fetch(count: Int?, maxBytes: Int?, offset: Int = 0) -> DataResult? {
if writer != nil {
finishFile()
}
Expand Down
11 changes: 8 additions & 3 deletions Sources/Segment/Utilities/Storage/Types/MemoryStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,19 @@ public class MemoryStore: DataStore {
}
}

public func fetch(count: Int?, maxBytes: Int?) -> DataResult? {
public func fetch(count: Int?, maxBytes: Int?, offset: Int = 0) -> DataResult? {
var skipped = 0
var accumulatedCount = 0
var accumulatedSize: Int = 0
var results = [ItemData]()

let maxBytes = maxBytes ?? config.maxFetchSize

for item in items {
if skipped < offset {
skipped += 1
continue
}
if accumulatedSize + item.data.count > maxBytes {
break
}
Expand Down