fix(PROJ-35): OCR Boot-Resume drosselt nach Queue-Kapazitaet
This commit is contained in:
+38
-12
@@ -187,20 +187,46 @@ func main() {
|
|||||||
"pdftotext", ts.HasPdftotext, "tesseract", ts.HasTesseract, "pdftoppm", ts.HasPdftoppm)
|
"pdftotext", ts.HasPdftotext, "tesseract", ts.HasTesseract, "pdftoppm", ts.HasPdftoppm)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Boot-resume: re-enqueue all mails still marked ocr_status='pending'.
|
// Boot-resume: re-enqueue mails still marked ocr_status='pending'.
|
||||||
|
// The worker queue holds 1000 slots; we refill it from the DB whenever
|
||||||
|
// it falls below half capacity. Each iteration only fetches as many
|
||||||
|
// pending mails as currently fit, so nothing is dropped.
|
||||||
|
// The worker updates ocr_status to done/failed/skipped, so subsequent
|
||||||
|
// queries only return genuinely outstanding jobs.
|
||||||
go func() {
|
go func() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
pending, err := mailStore.GetPendingOCRMails(ctx, nil, 5000)
|
queueCap := 1000 // matches ocr.Options.QueueSize above
|
||||||
if err != nil {
|
processed := 0
|
||||||
logger.Warn("ocr boot-resume: query failed", "err", err)
|
idleRounds := 0
|
||||||
return
|
for {
|
||||||
}
|
free := queueCap - ocrWorker.QueueLen()
|
||||||
if len(pending) == 0 {
|
if free < queueCap/2 {
|
||||||
return
|
time.Sleep(2 * time.Second)
|
||||||
}
|
continue
|
||||||
logger.Info("ocr boot-resume: re-enqueueing pending jobs", "count", len(pending))
|
}
|
||||||
for _, m := range pending {
|
pending, err := mailStore.GetPendingOCRMails(ctx, nil, free)
|
||||||
ocrWorker.Submit(m.ID, m.TenantID)
|
if err != nil {
|
||||||
|
logger.Warn("ocr boot-resume: query failed", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(pending) == 0 {
|
||||||
|
idleRounds++
|
||||||
|
if idleRounds >= 3 {
|
||||||
|
if processed > 0 {
|
||||||
|
logger.Info("ocr boot-resume: backlog drained", "total", processed)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idleRounds = 0
|
||||||
|
for _, m := range pending {
|
||||||
|
ocrWorker.Submit(m.ID, m.TenantID)
|
||||||
|
}
|
||||||
|
processed += len(pending)
|
||||||
|
logger.Info("ocr boot-resume: enqueued batch",
|
||||||
|
"batch", len(pending), "total_so_far", processed)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user