substitute-binary: Pipeline HTTP requests instead of using threads.

* guix/scripts/substitute-binary.scm (fetch-narinfo, %lookup-threads,
  n-par-map*): Remove.
  (narinfo-cache-file, cached-narinfo, cache-narinfo!, narinfo-request,
  http-multiple-get, read-to-eof, fetch-narinfos, lookup-narinfos,
  narinfo-from-file): New procedures.
  (lookup-narinfo): Rewrite in terms of 'lookup-narinfos'.
  (guix-substitute-binary): Use 'lookup-narinfos' instead of
  'lookup-narinfo'.
This commit is contained in:
Ludovic Courtès 2015-03-23 22:25:04 +01:00
parent 0561e9ae16
commit d3a652037e
1 changed files with 192 additions and 78 deletions

View File

@ -28,7 +28,7 @@
#:use-module (guix base64) #:use-module (guix base64)
#:use-module (guix pk-crypto) #:use-module (guix pk-crypto)
#:use-module (guix pki) #:use-module (guix pki)
#:use-module ((guix build utils) #:select (mkdir-p)) #:use-module ((guix build utils) #:select (mkdir-p dump-port))
#:use-module ((guix build download) #:use-module ((guix build download)
#:select (progress-proc uri-abbreviation)) #:select (progress-proc uri-abbreviation))
#:use-module (ice-9 rdelim) #:use-module (ice-9 rdelim)
@ -48,6 +48,8 @@
#:use-module (srfi srfi-34) #:use-module (srfi srfi-34)
#:use-module (srfi srfi-35) #:use-module (srfi srfi-35)
#:use-module (web uri) #:use-module (web uri)
#:use-module (web request)
#:use-module (web response)
#:use-module (guix http-client) #:use-module (guix http-client)
#:export (narinfo-signature->canonical-sexp #:export (narinfo-signature->canonical-sexp
read-narinfo read-narinfo
@ -218,7 +220,7 @@ failure."
gonna have to wait." gonna have to wait."
(delay (begin (delay (begin
(format (current-error-port) (format (current-error-port)
(_ "updating list of substitutes from '~a'...~%") (_ "updating list of substitutes from '~a'...\r")
url) url)
(open-cache url)))) (open-cache url))))
@ -380,40 +382,56 @@ or is signed by an unauthorized key."
the cache STR originates form." the cache STR originates form."
(call-with-input-string str (cut read-narinfo <> cache-uri))) (call-with-input-string str (cut read-narinfo <> cache-uri)))
(define (fetch-narinfo cache path)
"Return the <narinfo> record for PATH, or #f if CACHE does not hold PATH."
(define (download url)
;; Download the .narinfo from URL, and return its contents as a list of
;; key/value pairs. Don't emit an error message upon 404.
(false-if-exception (fetch (string->uri url)
#:quiet-404? #t)))
(and (string=? (cache-store-directory cache) (%store-prefix))
(and=> (download (string-append (cache-url cache) "/"
(store-path-hash-part path)
".narinfo"))
(cute read-narinfo <> (cache-url cache)))))
(define (obsolete? date now ttl) (define (obsolete? date now ttl)
"Return #t if DATE is obsolete compared to NOW + TTL seconds." "Return #t if DATE is obsolete compared to NOW + TTL seconds."
(time>? (subtract-duration now (make-time time-duration 0 ttl)) (time>? (subtract-duration now (make-time time-duration 0 ttl))
(make-time time-monotonic 0 date))) (make-time time-monotonic 0 date)))
(define %lookup-threads
;; Number of threads spawned to perform lookup operations. This means we
;; can have this many simultaneous HTTP GET requests to the server, which
;; limits the impact of connection latency.
20)
(define (lookup-narinfo cache path) (define (narinfo-cache-file path)
"Check locally if we have valid info about PATH, otherwise go to CACHE and "Return the name of the local file that contains an entry for PATH."
check what it has." (string-append %narinfo-cache-directory "/"
(store-path-hash-part path)))
(define (cached-narinfo path)
"Check locally if we have valid info about PATH. Return two values: a
Boolean indicating whether we have valid cached info, and that info, which may
be either #f (when PATH is unavailable) or the narinfo for PATH."
(define now (define now
(current-time time-monotonic)) (current-time time-monotonic))
(define cache-file (define cache-file
(string-append %narinfo-cache-directory "/" (narinfo-cache-file path))
(store-path-hash-part path)))
(catch 'system-error
(lambda ()
(call-with-input-file cache-file
(lambda (p)
(match (read p)
(('narinfo ('version 1)
('cache-uri cache-uri)
('date date) ('value #f))
;; A cached negative lookup.
(if (obsolete? date now %narinfo-negative-ttl)
(values #f #f)
(values #t #f)))
(('narinfo ('version 1)
('cache-uri cache-uri)
('date date) ('value value))
;; A cached positive lookup
(if (obsolete? date now %narinfo-ttl)
(values #f #f)
(values #t (string->narinfo value cache-uri))))
(('narinfo ('version v) _ ...)
(values #f #f))))))
(lambda _
(values #f #f))))
(define (cache-narinfo! cache path narinfo)
"Cache locally NARNIFO for PATH, which originates from CACHE. NARINFO may
be #f, in which case it indicates that PATH is unavailable at CACHE."
(define now
(current-time time-monotonic))
(define (cache-entry cache-uri narinfo) (define (cache-entry cache-uri narinfo)
`(narinfo (version 1) `(narinfo (version 1)
@ -421,43 +439,153 @@ check what it has."
(date ,(time-second now)) (date ,(time-second now))
(value ,(and=> narinfo narinfo->string)))) (value ,(and=> narinfo narinfo->string))))
(let*-values (((valid? cached) (with-atomic-file-output (narinfo-cache-file path)
(catch 'system-error (lambda (out)
(lambda () (write (cache-entry (cache-url cache) narinfo) out)))
(call-with-input-file cache-file narinfo)
(lambda (p)
(match (read p) (define (narinfo-request cache-url path)
(('narinfo ('version 1) "Return an HTTP request for the narinfo of PATH at CACHE-URL."
('cache-uri cache-uri) (let ((url (string-append cache-url "/" (store-path-hash-part path)
('date date) ('value #f)) ".narinfo")))
;; A cached negative lookup. (build-request (string->uri url) #:method 'GET)))
(if (obsolete? date now %narinfo-negative-ttl)
(values #f #f) (define (http-multiple-get base-url requests proc)
(values #t #f))) "Send all of REQUESTS to the server at BASE-URL. Call PROC for each
(('narinfo ('version 1) response, passing it the request object, the response, and a port from which
('cache-uri cache-uri) to read the response body. Return the list of results."
('date date) ('value value)) (let connect ((requests requests)
;; A cached positive lookup (result '()))
(if (obsolete? date now %narinfo-ttl) ;; (format (current-error-port) "connecting (~a requests left)..."
(values #f #f) ;; (length requests))
(values #t (string->narinfo value (let ((p (open-socket-for-uri base-url)))
cache-uri)))) ;; Send all of REQUESTS in a row.
(('narinfo ('version v) _ ...) (setvbuf p _IOFBF (expt 2 16))
(values #f #f)))))) (for-each (cut write-request <> p) requests)
(lambda _ (force-output p)
(values #f #f)))))
(if valid? ;; Now start processing responses.
cached ; including negative caches (let loop ((requests requests)
(result result))
(match requests
(()
(reverse result))
((head tail ...)
(let* ((resp (read-response p))
(body (response-body-port resp)))
;; The server can choose to stop responding at any time, in which
;; case we have to try again. Check whether that is the case.
(match (assq 'connection (response-headers resp))
(('connection 'close)
(connect requests result)) ;try again
(_
(loop tail ;keep going
(cons (proc head resp body) result)))))))))))
(define (read-to-eof port)
"Read from PORT until EOF is reached. The data are discarded."
(dump-port port (%make-void-port "w")))
(define (narinfo-from-file file url)
"Attempt to read a narinfo from FILE, using URL as the cache URL. Return #f
if file doesn't exist, and the narinfo otherwise."
(catch 'system-error
(lambda ()
(call-with-input-file file
(cut read-narinfo <> url)))
(lambda args
(if (= ENOENT (system-error-errno args))
#f
(apply throw args)))))
(define (fetch-narinfos cache paths)
"Retrieve all the narinfos for PATHS from CACHE and return them."
(define url
(cache-url cache))
(define update-progress!
(let ((done 0))
(lambda ()
(display #\cr (current-error-port))
(force-output (current-error-port))
(format (current-error-port)
(_ "updating list of substitutes from '~a'... ~5,1f%")
url (* 100. (/ done (length paths))))
(set! done (+ 1 done)))))
(define (handle-narinfo-response request response port)
(let ((len (response-content-length response)))
;; Make sure to read no more than LEN bytes since subsequent bytes may
;; belong to the next response.
(case (response-code response)
((200) ; hit
(let ((narinfo (read-narinfo port url #:size len)))
(cache-narinfo! cache (narinfo-path narinfo) narinfo)
(update-progress!)
narinfo))
((404) ; failure
(let* ((path (uri-path (request-uri request)))
(hash-part (string-drop-right path 8))) ; drop ".narinfo"
(if len
(get-bytevector-n port len)
(read-to-eof port))
(cache-narinfo! cache
(find (cut string-contains <> hash-part) paths)
#f)
(update-progress!))
#f)
(else ; transient failure
(if len
(get-bytevector-n port len)
(read-to-eof port))
#f))))
(and (string=? (cache-store-directory cache) (%store-prefix))
(let ((uri (string->uri url)))
(case (and=> uri uri-scheme)
((http)
(let ((requests (map (cut narinfo-request url <>) paths)))
(update-progress!)
(let ((result (http-multiple-get url requests
handle-narinfo-response)))
(newline (current-error-port))
result)))
((file #f)
(let* ((base (string-append (uri-path uri) "/"))
(files (map (compose (cut string-append base <> ".narinfo")
store-path-hash-part)
paths)))
(filter-map (cut narinfo-from-file <> url) files)))
(else
(leave (_ "~s: unsupported server URI scheme~%")
(if uri (uri-scheme uri) url)))))))
(define (lookup-narinfos cache paths)
"Return the narinfos for PATHS, invoking the server at CACHE when no
information is available locally."
(let-values (((cached missing)
(fold2 (lambda (path cached missing)
(let-values (((valid? value)
(cached-narinfo path)))
(if valid?
(values (cons value cached) missing)
(values cached (cons path missing)))))
'()
'()
paths)))
(if (null? missing)
cached
(let* ((cache (force cache)) (let* ((cache (force cache))
(narinfo (and cache (fetch-narinfo cache path)))) (missing (if cache
;; Cache NARINFO only when CACHE was actually accessible. This (fetch-narinfos cache missing)
;; avoids caching negative hits when in fact we just lacked network '())))
;; access. (append cached missing)))))
(when cache
(with-atomic-file-output cache-file (define (lookup-narinfo cache path)
(lambda (out) "Return the narinfo for PATH in CACHE, or #f when no substitute for PATH was
(write (cache-entry (cache-url cache) narinfo) out)))) found."
narinfo)))) (match (lookup-narinfos cache (list path))
((answer) answer)))
(define (remove-expired-cached-narinfos) (define (remove-expired-cached-narinfos)
"Remove expired narinfo entries from the cache. The sole purpose of this "Remove expired narinfo entries from the cache. The sole purpose of this
@ -580,16 +708,6 @@ Internal tool to substitute a pre-built binary to a local build.\n"))
;;; Entry point. ;;; Entry point.
;;; ;;;
(define n-par-map*
;; We want the ability to run many threads in parallel, regardless of the
;; number of cores. However, Guile 2.0.5 has a bug whereby 'n-par-map' ends
;; up consuming a lot of memory, possibly leading to death. Thus, resort to
;; 'par-map' on 2.0.5.
(if (guile-version>? "2.0.5")
n-par-map
(lambda (n proc lst)
(par-map proc lst))))
(define (check-acl-initialized) (define (check-acl-initialized)
"Warn if the ACL is uninitialized." "Warn if the ACL is uninitialized."
(define (singleton? acl) (define (singleton? acl)
@ -698,9 +816,7 @@ substituter disabled~%")
;; Return the subset of PATHS available in CACHE. ;; Return the subset of PATHS available in CACHE.
(let ((substitutable (let ((substitutable
(if cache (if cache
(n-par-map* %lookup-threads (lookup-narinfos cache paths)
(cut lookup-narinfo cache <>)
paths)
'()))) '())))
(for-each (lambda (narinfo) (for-each (lambda (narinfo)
(format #t "~a~%" (narinfo-path narinfo))) (format #t "~a~%" (narinfo-path narinfo)))
@ -710,9 +826,7 @@ substituter disabled~%")
;; Reply info about PATHS if it's in CACHE. ;; Reply info about PATHS if it's in CACHE.
(let ((substitutable (let ((substitutable
(if cache (if cache
(n-par-map* %lookup-threads (lookup-narinfos cache paths)
(cut lookup-narinfo cache <>)
paths)
'()))) '())))
(for-each (lambda (narinfo) (for-each (lambda (narinfo)
(format #t "~a\n~a\n~a\n" (format #t "~a\n~a\n~a\n"