Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / appengine / internal / api.go
1 // Copyright 2011 Google Inc. All rights reserved.
2 // Use of this source code is governed by the Apache 2.0
3 // license that can be found in the LICENSE file.
4
5 // +build !appengine
6
7 package internal
8
9 import (
10         "bytes"
11         "errors"
12         "fmt"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "net/url"
18         "os"
19         "runtime"
20         "strconv"
21         "strings"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         "github.com/golang/protobuf/proto"
27         netcontext "golang.org/x/net/context"
28
29         basepb "google.golang.org/appengine/internal/base"
30         logpb "google.golang.org/appengine/internal/log"
31         remotepb "google.golang.org/appengine/internal/remote_api"
32 )
33
34 const (
35         apiPath             = "/rpc_http"
36         defaultTicketSuffix = "/default.20150612t184001.0"
37 )
38
39 var (
40         // Incoming headers.
41         ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
42         dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
43         traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
44         curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
45         userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP")
46         remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
47         devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id")
48
49         // Outgoing headers.
50         apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
51         apiEndpointHeaderValue = []string{"app-engine-apis"}
52         apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
53         apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"}
54         apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
55         apiContentType         = http.CanonicalHeaderKey("Content-Type")
56         apiContentTypeValue    = []string{"application/octet-stream"}
57         logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
58
59         apiHTTPClient = &http.Client{
60                 Transport: &http.Transport{
61                         Proxy: http.ProxyFromEnvironment,
62                         Dial:  limitDial,
63                 },
64         }
65
66         defaultTicketOnce     sync.Once
67         defaultTicket         string
68         backgroundContextOnce sync.Once
69         backgroundContext     netcontext.Context
70 )
71
72 func apiURL() *url.URL {
73         host, port := "appengine.googleapis.internal", "10001"
74         if h := os.Getenv("API_HOST"); h != "" {
75                 host = h
76         }
77         if p := os.Getenv("API_PORT"); p != "" {
78                 port = p
79         }
80         return &url.URL{
81                 Scheme: "http",
82                 Host:   host + ":" + port,
83                 Path:   apiPath,
84         }
85 }
86
87 func handleHTTP(w http.ResponseWriter, r *http.Request) {
88         c := &context{
89                 req:       r,
90                 outHeader: w.Header(),
91                 apiURL:    apiURL(),
92         }
93         r = r.WithContext(withContext(r.Context(), c))
94         c.req = r
95
96         stopFlushing := make(chan int)
97
98         // Patch up RemoteAddr so it looks reasonable.
99         if addr := r.Header.Get(userIPHeader); addr != "" {
100                 r.RemoteAddr = addr
101         } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
102                 r.RemoteAddr = addr
103         } else {
104                 // Should not normally reach here, but pick a sensible default anyway.
105                 r.RemoteAddr = "127.0.0.1"
106         }
107         // The address in the headers will most likely be of these forms:
108         //      123.123.123.123
109         //      2001:db8::1
110         // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
111         if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
112                 // Assume the remote address is only a host; add a default port.
113                 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
114         }
115
116         // Start goroutine responsible for flushing app logs.
117         // This is done after adding c to ctx.m (and stopped before removing it)
118         // because flushing logs requires making an API call.
119         go c.logFlusher(stopFlushing)
120
121         executeRequestSafely(c, r)
122         c.outHeader = nil // make sure header changes aren't respected any more
123
124         stopFlushing <- 1 // any logging beyond this point will be dropped
125
126         // Flush any pending logs asynchronously.
127         c.pendingLogs.Lock()
128         flushes := c.pendingLogs.flushes
129         if len(c.pendingLogs.lines) > 0 {
130                 flushes++
131         }
132         c.pendingLogs.Unlock()
133         flushed := make(chan struct{})
134         go func() {
135                 defer close(flushed)
136                 // Force a log flush, because with very short requests we
137                 // may not ever flush logs.
138                 c.flushLog(true)
139         }()
140         w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
141
142         // Avoid nil Write call if c.Write is never called.
143         if c.outCode != 0 {
144                 w.WriteHeader(c.outCode)
145         }
146         if c.outBody != nil {
147                 w.Write(c.outBody)
148         }
149         // Wait for the last flush to complete before returning,
150         // otherwise the security ticket will not be valid.
151         <-flushed
152 }
153
154 func executeRequestSafely(c *context, r *http.Request) {
155         defer func() {
156                 if x := recover(); x != nil {
157                         logf(c, 4, "%s", renderPanic(x)) // 4 == critical
158                         c.outCode = 500
159                 }
160         }()
161
162         http.DefaultServeMux.ServeHTTP(c, r)
163 }
164
165 func renderPanic(x interface{}) string {
166         buf := make([]byte, 16<<10) // 16 KB should be plenty
167         buf = buf[:runtime.Stack(buf, false)]
168
169         // Remove the first few stack frames:
170         //   this func
171         //   the recover closure in the caller
172         // That will root the stack trace at the site of the panic.
173         const (
174                 skipStart  = "internal.renderPanic"
175                 skipFrames = 2
176         )
177         start := bytes.Index(buf, []byte(skipStart))
178         p := start
179         for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
180                 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
181                 if p < 0 {
182                         break
183                 }
184         }
185         if p >= 0 {
186                 // buf[start:p+1] is the block to remove.
187                 // Copy buf[p+1:] over buf[start:] and shrink buf.
188                 copy(buf[start:], buf[p+1:])
189                 buf = buf[:len(buf)-(p+1-start)]
190         }
191
192         // Add panic heading.
193         head := fmt.Sprintf("panic: %v\n\n", x)
194         if len(head) > len(buf) {
195                 // Extremely unlikely to happen.
196                 return head
197         }
198         copy(buf[len(head):], buf)
199         copy(buf, head)
200
201         return string(buf)
202 }
203
204 // context represents the context of an in-flight HTTP request.
205 // It implements the appengine.Context and http.ResponseWriter interfaces.
206 type context struct {
207         req *http.Request
208
209         outCode   int
210         outHeader http.Header
211         outBody   []byte
212
213         pendingLogs struct {
214                 sync.Mutex
215                 lines   []*logpb.UserAppLogLine
216                 flushes int
217         }
218
219         apiURL *url.URL
220 }
221
222 var contextKey = "holds a *context"
223
224 // jointContext joins two contexts in a superficial way.
225 // It takes values and timeouts from a base context, and only values from another context.
226 type jointContext struct {
227         base       netcontext.Context
228         valuesOnly netcontext.Context
229 }
230
231 func (c jointContext) Deadline() (time.Time, bool) {
232         return c.base.Deadline()
233 }
234
235 func (c jointContext) Done() <-chan struct{} {
236         return c.base.Done()
237 }
238
239 func (c jointContext) Err() error {
240         return c.base.Err()
241 }
242
243 func (c jointContext) Value(key interface{}) interface{} {
244         if val := c.base.Value(key); val != nil {
245                 return val
246         }
247         return c.valuesOnly.Value(key)
248 }
249
250 // fromContext returns the App Engine context or nil if ctx is not
251 // derived from an App Engine context.
252 func fromContext(ctx netcontext.Context) *context {
253         c, _ := ctx.Value(&contextKey).(*context)
254         return c
255 }
256
257 func withContext(parent netcontext.Context, c *context) netcontext.Context {
258         ctx := netcontext.WithValue(parent, &contextKey, c)
259         if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
260                 ctx = withNamespace(ctx, ns)
261         }
262         return ctx
263 }
264
265 func toContext(c *context) netcontext.Context {
266         return withContext(netcontext.Background(), c)
267 }
268
269 func IncomingHeaders(ctx netcontext.Context) http.Header {
270         if c := fromContext(ctx); c != nil {
271                 return c.req.Header
272         }
273         return nil
274 }
275
276 func ReqContext(req *http.Request) netcontext.Context {
277         return req.Context()
278 }
279
280 func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
281         return jointContext{
282                 base:       parent,
283                 valuesOnly: req.Context(),
284         }
285 }
286
287 // DefaultTicket returns a ticket used for background context or dev_appserver.
288 func DefaultTicket() string {
289         defaultTicketOnce.Do(func() {
290                 if IsDevAppServer() {
291                         defaultTicket = "testapp" + defaultTicketSuffix
292                         return
293                 }
294                 appID := partitionlessAppID()
295                 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
296                 majVersion := VersionID(nil)
297                 if i := strings.Index(majVersion, "."); i > 0 {
298                         majVersion = majVersion[:i]
299                 }
300                 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
301         })
302         return defaultTicket
303 }
304
305 func BackgroundContext() netcontext.Context {
306         backgroundContextOnce.Do(func() {
307                 // Compute background security ticket.
308                 ticket := DefaultTicket()
309
310                 c := &context{
311                         req: &http.Request{
312                                 Header: http.Header{
313                                         ticketHeader: []string{ticket},
314                                 },
315                         },
316                         apiURL: apiURL(),
317                 }
318                 backgroundContext = toContext(c)
319
320                 // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
321                 go c.logFlusher(make(chan int))
322         })
323
324         return backgroundContext
325 }
326
327 // RegisterTestRequest registers the HTTP request req for testing, such that
328 // any API calls are sent to the provided URL. It returns a closure to delete
329 // the registration.
330 // It should only be used by aetest package.
331 func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
332         c := &context{
333                 req:    req,
334                 apiURL: apiURL,
335         }
336         ctx := withContext(decorate(req.Context()), c)
337         req = req.WithContext(ctx)
338         c.req = req
339         return req, func() {}
340 }
341
342 var errTimeout = &CallError{
343         Detail:  "Deadline exceeded",
344         Code:    int32(remotepb.RpcError_CANCELLED),
345         Timeout: true,
346 }
347
348 func (c *context) Header() http.Header { return c.outHeader }
349
350 // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
351 // codes do not permit a response body (nor response entity headers such as
352 // Content-Length, Content-Type, etc).
353 func bodyAllowedForStatus(status int) bool {
354         switch {
355         case status >= 100 && status <= 199:
356                 return false
357         case status == 204:
358                 return false
359         case status == 304:
360                 return false
361         }
362         return true
363 }
364
365 func (c *context) Write(b []byte) (int, error) {
366         if c.outCode == 0 {
367                 c.WriteHeader(http.StatusOK)
368         }
369         if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
370                 return 0, http.ErrBodyNotAllowed
371         }
372         c.outBody = append(c.outBody, b...)
373         return len(b), nil
374 }
375
376 func (c *context) WriteHeader(code int) {
377         if c.outCode != 0 {
378                 logf(c, 3, "WriteHeader called multiple times on request.") // error level
379                 return
380         }
381         c.outCode = code
382 }
383
384 func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
385         hreq := &http.Request{
386                 Method: "POST",
387                 URL:    c.apiURL,
388                 Header: http.Header{
389                         apiEndpointHeader: apiEndpointHeaderValue,
390                         apiMethodHeader:   apiMethodHeaderValue,
391                         apiContentType:    apiContentTypeValue,
392                         apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
393                 },
394                 Body:          ioutil.NopCloser(bytes.NewReader(body)),
395                 ContentLength: int64(len(body)),
396                 Host:          c.apiURL.Host,
397         }
398         if info := c.req.Header.Get(dapperHeader); info != "" {
399                 hreq.Header.Set(dapperHeader, info)
400         }
401         if info := c.req.Header.Get(traceHeader); info != "" {
402                 hreq.Header.Set(traceHeader, info)
403         }
404
405         tr := apiHTTPClient.Transport.(*http.Transport)
406
407         var timedOut int32 // atomic; set to 1 if timed out
408         t := time.AfterFunc(timeout, func() {
409                 atomic.StoreInt32(&timedOut, 1)
410                 tr.CancelRequest(hreq)
411         })
412         defer t.Stop()
413         defer func() {
414                 // Check if timeout was exceeded.
415                 if atomic.LoadInt32(&timedOut) != 0 {
416                         err = errTimeout
417                 }
418         }()
419
420         hresp, err := apiHTTPClient.Do(hreq)
421         if err != nil {
422                 return nil, &CallError{
423                         Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
424                         Code:   int32(remotepb.RpcError_UNKNOWN),
425                 }
426         }
427         defer hresp.Body.Close()
428         hrespBody, err := ioutil.ReadAll(hresp.Body)
429         if hresp.StatusCode != 200 {
430                 return nil, &CallError{
431                         Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
432                         Code:   int32(remotepb.RpcError_UNKNOWN),
433                 }
434         }
435         if err != nil {
436                 return nil, &CallError{
437                         Detail: fmt.Sprintf("service bridge response bad: %v", err),
438                         Code:   int32(remotepb.RpcError_UNKNOWN),
439                 }
440         }
441         return hrespBody, nil
442 }
443
444 func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
445         if ns := NamespaceFromContext(ctx); ns != "" {
446                 if fn, ok := NamespaceMods[service]; ok {
447                         fn(in, ns)
448                 }
449         }
450
451         if f, ctx, ok := callOverrideFromContext(ctx); ok {
452                 return f(ctx, service, method, in, out)
453         }
454
455         // Handle already-done contexts quickly.
456         select {
457         case <-ctx.Done():
458                 return ctx.Err()
459         default:
460         }
461
462         c := fromContext(ctx)
463         if c == nil {
464                 // Give a good error message rather than a panic lower down.
465                 return errNotAppEngineContext
466         }
467
468         // Apply transaction modifications if we're in a transaction.
469         if t := transactionFromContext(ctx); t != nil {
470                 if t.finished {
471                         return errors.New("transaction context has expired")
472                 }
473                 applyTransaction(in, &t.transaction)
474         }
475
476         // Default RPC timeout is 60s.
477         timeout := 60 * time.Second
478         if deadline, ok := ctx.Deadline(); ok {
479                 timeout = deadline.Sub(time.Now())
480         }
481
482         data, err := proto.Marshal(in)
483         if err != nil {
484                 return err
485         }
486
487         ticket := c.req.Header.Get(ticketHeader)
488         // Use a test ticket under test environment.
489         if ticket == "" {
490                 if appid := ctx.Value(&appIDOverrideKey); appid != nil {
491                         ticket = appid.(string) + defaultTicketSuffix
492                 }
493         }
494         // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
495         if ticket == "" {
496                 ticket = DefaultTicket()
497         }
498         if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" {
499                 ticket = dri
500         }
501         req := &remotepb.Request{
502                 ServiceName: &service,
503                 Method:      &method,
504                 Request:     data,
505                 RequestId:   &ticket,
506         }
507         hreqBody, err := proto.Marshal(req)
508         if err != nil {
509                 return err
510         }
511
512         hrespBody, err := c.post(hreqBody, timeout)
513         if err != nil {
514                 return err
515         }
516
517         res := &remotepb.Response{}
518         if err := proto.Unmarshal(hrespBody, res); err != nil {
519                 return err
520         }
521         if res.RpcError != nil {
522                 ce := &CallError{
523                         Detail: res.RpcError.GetDetail(),
524                         Code:   *res.RpcError.Code,
525                 }
526                 switch remotepb.RpcError_ErrorCode(ce.Code) {
527                 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
528                         ce.Timeout = true
529                 }
530                 return ce
531         }
532         if res.ApplicationError != nil {
533                 return &APIError{
534                         Service: *req.ServiceName,
535                         Detail:  res.ApplicationError.GetDetail(),
536                         Code:    *res.ApplicationError.Code,
537                 }
538         }
539         if res.Exception != nil || res.JavaException != nil {
540                 // This shouldn't happen, but let's be defensive.
541                 return &CallError{
542                         Detail: "service bridge returned exception",
543                         Code:   int32(remotepb.RpcError_UNKNOWN),
544                 }
545         }
546         return proto.Unmarshal(res.Response, out)
547 }
548
549 func (c *context) Request() *http.Request {
550         return c.req
551 }
552
553 func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
554         // Truncate long log lines.
555         // TODO(dsymonds): Check if this is still necessary.
556         const lim = 8 << 10
557         if len(*ll.Message) > lim {
558                 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
559                 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
560         }
561
562         c.pendingLogs.Lock()
563         c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
564         c.pendingLogs.Unlock()
565 }
566
567 var logLevelName = map[int64]string{
568         0: "DEBUG",
569         1: "INFO",
570         2: "WARNING",
571         3: "ERROR",
572         4: "CRITICAL",
573 }
574
575 func logf(c *context, level int64, format string, args ...interface{}) {
576         if c == nil {
577                 panic("not an App Engine context")
578         }
579         s := fmt.Sprintf(format, args...)
580         s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
581         c.addLogLine(&logpb.UserAppLogLine{
582                 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
583                 Level:         &level,
584                 Message:       &s,
585         })
586         // Only duplicate log to stderr if not running on App Engine second generation
587         if !IsSecondGen() {
588                 log.Print(logLevelName[level] + ": " + s)
589         }
590 }
591
592 // flushLog attempts to flush any pending logs to the appserver.
593 // It should not be called concurrently.
594 func (c *context) flushLog(force bool) (flushed bool) {
595         c.pendingLogs.Lock()
596         // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
597         n, rem := 0, 30<<20
598         for ; n < len(c.pendingLogs.lines); n++ {
599                 ll := c.pendingLogs.lines[n]
600                 // Each log line will require about 3 bytes of overhead.
601                 nb := proto.Size(ll) + 3
602                 if nb > rem {
603                         break
604                 }
605                 rem -= nb
606         }
607         lines := c.pendingLogs.lines[:n]
608         c.pendingLogs.lines = c.pendingLogs.lines[n:]
609         c.pendingLogs.Unlock()
610
611         if len(lines) == 0 && !force {
612                 // Nothing to flush.
613                 return false
614         }
615
616         rescueLogs := false
617         defer func() {
618                 if rescueLogs {
619                         c.pendingLogs.Lock()
620                         c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
621                         c.pendingLogs.Unlock()
622                 }
623         }()
624
625         buf, err := proto.Marshal(&logpb.UserAppLogGroup{
626                 LogLine: lines,
627         })
628         if err != nil {
629                 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
630                 rescueLogs = true
631                 return false
632         }
633
634         req := &logpb.FlushRequest{
635                 Logs: buf,
636         }
637         res := &basepb.VoidProto{}
638         c.pendingLogs.Lock()
639         c.pendingLogs.flushes++
640         c.pendingLogs.Unlock()
641         if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
642                 log.Printf("internal.flushLog: Flush RPC: %v", err)
643                 rescueLogs = true
644                 return false
645         }
646         return true
647 }
648
649 const (
650         // Log flushing parameters.
651         flushInterval      = 1 * time.Second
652         forceFlushInterval = 60 * time.Second
653 )
654
655 func (c *context) logFlusher(stop <-chan int) {
656         lastFlush := time.Now()
657         tick := time.NewTicker(flushInterval)
658         for {
659                 select {
660                 case <-stop:
661                         // Request finished.
662                         tick.Stop()
663                         return
664                 case <-tick.C:
665                         force := time.Now().Sub(lastFlush) > forceFlushInterval
666                         if c.flushLog(force) {
667                                 lastFlush = time.Now()
668                         }
669                 }
670         }
671 }
672
673 func ContextForTesting(req *http.Request) netcontext.Context {
674         return toContext(&context{req: req})
675 }