Fix the dependency issue (#231)

This commit is contained in:
Robbie Zhang
2018-06-21 12:09:42 -07:00
committed by GitHub
parent 027b76651d
commit 6ec1098bb8
16629 changed files with 74837 additions and 4975021 deletions

View File

@@ -37,6 +37,13 @@ func (o *WaitReader) ReadResponse(response runtime.ClientResponse, consumer runt
}
return nil, result
case 428:
result := NewWaitPreconditionRequired()
if err := result.readResponse(response, consumer, o.formats); err != nil {
return nil, err
}
return nil, result
case 500:
result := NewWaitInternalServerError()
if err := result.readResponse(response, consumer, o.formats); err != nil {
@@ -99,6 +106,35 @@ func (o *WaitNotFound) readResponse(response runtime.ClientResponse, consumer ru
return nil
}
// NewWaitPreconditionRequired creates a WaitPreconditionRequired with default headers values
func NewWaitPreconditionRequired() *WaitPreconditionRequired {
return &WaitPreconditionRequired{}
}
/*WaitPreconditionRequired handles this case with default header values.
target resource is not powered on
*/
type WaitPreconditionRequired struct {
Payload *models.Error
}
func (o *WaitPreconditionRequired) Error() string {
return fmt.Sprintf("[PUT /tasks][%d] waitPreconditionRequired %+v", 428, o.Payload)
}
func (o *WaitPreconditionRequired) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error {
o.Payload = new(models.Error)
// response payload
if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF {
return err
}
return nil
}
// NewWaitInternalServerError creates a WaitInternalServerError with default headers values
func NewWaitInternalServerError() *WaitInternalServerError {
return &WaitInternalServerError{}

View File

@@ -1,207 +0,0 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package restapi
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/errors"
"github.com/go-openapi/runtime"
"github.com/go-openapi/swag"
"github.com/tylerb/graceful"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/handlers"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/options"
"github.com/vmware/vic/lib/portlayer"
"github.com/vmware/vic/lib/portlayer/exec"
"github.com/vmware/vic/pkg/version"
"github.com/vmware/vic/pkg/vsphere/session"
)
// This file is safe to edit. Once it exists it will not be overwritten
type handler interface {
Configure(api *operations.PortLayerAPI, handlerCtx *handlers.HandlerContext)
}
var portlayerhandlers = []handler{
&handlers.StorageHandlersImpl{},
&handlers.MiscHandlersImpl{},
&handlers.ScopesHandlersImpl{},
&handlers.ContainersHandlersImpl{},
&handlers.InteractionHandlersImpl{},
&handlers.LoggingHandlersImpl{},
&handlers.KvHandlersImpl{},
&handlers.EventsHandlerImpl{},
&handlers.TaskHandlersImpl{},
}
var apiServers []*graceful.Server
const stopTimeout = time.Second * 3
func configureFlags(api *operations.PortLayerAPI) {
api.CommandLineOptionsGroups = []swag.CommandLineOptionsGroup{
{
LongDescription: "Port Layer Options",
Options: options.PortLayerOptions,
ShortDescription: "Port Layer Options",
},
}
}
func configureAPI(api *operations.PortLayerAPI) http.Handler {
api.Logger = log.Printf
ctx := context.Background()
sessionconfig := &session.Config{
Service: options.PortLayerOptions.SDK,
Insecure: options.PortLayerOptions.Insecure,
Keepalive: options.PortLayerOptions.Keepalive,
DatacenterPath: options.PortLayerOptions.DatacenterPath,
ClusterPath: options.PortLayerOptions.ClusterPath,
PoolPath: options.PortLayerOptions.PoolPath,
DatastorePath: options.PortLayerOptions.DatastorePath,
UserAgent: version.UserAgent("vic-engine"),
}
sess, err := session.NewSession(sessionconfig).Create(ctx)
if err != nil {
log.Fatalf("configure_port_layer ERROR: %s", err)
}
// Configure the func invoked if the PL panics or is restarted by vic-init
api.ServerShutdown = func() {
log.Infof("Shutting down port-layer-server")
// stop the event collectors
collectors := exec.Config.EventManager.Collectors()
for _, c := range collectors {
c.Stop()
}
// Logout the session
if err := sess.Logout(ctx); err != nil {
log.Warnf("unable to log out of session: %s", err)
}
}
// initialize the port layer
if err = portlayer.Init(ctx, sess); err != nil {
log.Fatalf("could not initialize port layer: %s", err)
}
// configure the api here
api.ServeError = errors.ServeError
// FIXME: after updated go-openapi/runtime vendor code, revert ByteStreamConsumer() back to runtime.ByteStreamConsumer()
api.BinConsumer = ByteStreamConsumer()
api.JSONConsumer = runtime.JSONConsumer()
api.TarConsumer = ByteStreamConsumer()
// FIXME: after updated go-openapi/runtime vendor code, revert ByteStreamProducer() back to runtime.ByteStreamProducer()
api.BinProducer = ByteStreamProducer()
api.JSONProducer = runtime.JSONProducer()
api.TarProducer = ByteStreamProducer()
api.TxtProducer = runtime.TextProducer()
handlerCtx := &handlers.HandlerContext{
Session: sess,
}
for _, handler := range portlayerhandlers {
handler.Configure(api, handlerCtx)
}
return setupGlobalMiddleware(api.Serve(setupMiddlewares))
}
// The TLS configuration before HTTPS server starts.
func configureTLS(tlsConfig *tls.Config) {
// Make all necessary changes to the TLS configuration here.
}
func StopAPIServers() {
for _, s := range apiServers {
s.Stop(stopTimeout)
}
}
// As soon as server is initialized but not run yet, this function will be called.
// If you need to modify a config, store server instance to stop it individually later, this is the place.
// This function can be called multiple times, depending on the number of serving schemes.
// scheme value will be set accordingly: "http", "https" or "unix"
func configureServer(s *graceful.Server, scheme string) {
s.NoSignalHandling = true
s.Timeout = stopTimeout
apiServers = append(apiServers, s)
}
// The middleware configuration is for the handler executors. These do not apply to the swagger.json document.
// The middleware executes after routing but before authentication, binding and validation
func setupMiddlewares(handler http.Handler) http.Handler {
return handler
}
// The middleware configuration happens before anything, this middleware also applies to serving the swagger.json document.
// So this is a good place to plug in a panic handling middleware, logging and metrics
func setupGlobalMiddleware(handler http.Handler) http.Handler {
return handler
}
// FIXME: to avoid update go-openapi/runtime vendor code at this time, write our own
// ByteStreamConsumer to read back encoded json format error message
func ByteStreamConsumer() runtime.Consumer {
wrapped := runtime.ByteStreamConsumer()
return runtime.ConsumerFunc(func(reader io.Reader, data interface{}) error {
if reader == nil {
return fmt.Errorf("ByteStreamConsumer requires a reader") // early exit
}
if er, ok := data.(*models.Error); ok {
dec := json.NewDecoder(reader)
return dec.Decode(er)
}
return wrapped.Consume(reader, data)
})
}
// FIXME: to avoid update go-openapi/runtime vendor code at this time, write our own
// ByteStreamProducer to encode error to json string
func ByteStreamProducer() runtime.Producer {
wrapped := runtime.ByteStreamProducer()
return runtime.ProducerFunc(func(writer io.Writer, data interface{}) error {
if writer == nil {
return fmt.Errorf("ByteStreamProducer requires a writer") // early exit
}
if er, ok := data.(*models.Error); ok {
enc := json.NewEncoder(writer)
return enc.Encode(er)
}
return wrapped.Produce(writer, data)
})
}

View File

@@ -1,679 +0,0 @@
// Copyright 2016-2018 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"net/http"
"sort"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/govmomi/vim25/types"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/containers"
"github.com/vmware/vic/lib/config/executor"
"github.com/vmware/vic/lib/constants"
"github.com/vmware/vic/lib/iolog"
"github.com/vmware/vic/lib/migration/feature"
"github.com/vmware/vic/lib/portlayer/exec"
"github.com/vmware/vic/lib/portlayer/metrics"
"github.com/vmware/vic/pkg/ip"
"github.com/vmware/vic/pkg/trace"
"github.com/vmware/vic/pkg/uid"
"github.com/vmware/vic/pkg/version"
)
const (
containerWaitTimeout = 3 * time.Minute
)
// ContainersHandlersImpl is the receiver for all of the exec handler methods
type ContainersHandlersImpl struct {
handlerCtx *HandlerContext
}
// Configure assigns functions to all the exec api handlers
func (handler *ContainersHandlersImpl) Configure(api *operations.PortLayerAPI, handlerCtx *HandlerContext) {
api.ContainersCreateHandler = containers.CreateHandlerFunc(handler.CreateHandler)
api.ContainersStateChangeHandler = containers.StateChangeHandlerFunc(handler.StateChangeHandler)
api.ContainersGetHandler = containers.GetHandlerFunc(handler.GetHandler)
api.ContainersCommitHandler = containers.CommitHandlerFunc(handler.CommitHandler)
api.ContainersGetStateHandler = containers.GetStateHandlerFunc(handler.GetStateHandler)
api.ContainersContainerRemoveHandler = containers.ContainerRemoveHandlerFunc(handler.RemoveContainerHandler)
api.ContainersGetContainerInfoHandler = containers.GetContainerInfoHandlerFunc(handler.GetContainerInfoHandler)
api.ContainersGetContainerListHandler = containers.GetContainerListHandlerFunc(handler.GetContainerListHandler)
api.ContainersContainerSignalHandler = containers.ContainerSignalHandlerFunc(handler.ContainerSignalHandler)
api.ContainersGetContainerLogsHandler = containers.GetContainerLogsHandlerFunc(handler.GetContainerLogsHandler)
api.ContainersContainerWaitHandler = containers.ContainerWaitHandlerFunc(handler.ContainerWaitHandler)
api.ContainersContainerRenameHandler = containers.ContainerRenameHandlerFunc(handler.RenameContainerHandler)
api.ContainersGetContainerStatsHandler = containers.GetContainerStatsHandlerFunc(handler.GetContainerStatsHandler)
handler.handlerCtx = handlerCtx
}
// CreateHandler creates a new container
func (handler *ContainersHandlersImpl) CreateHandler(params containers.CreateParams) middleware.Responder {
var err error
id := uid.New().String()
op := trace.NewOperation(context.Background(), "create container %s", id)
defer trace.End(trace.Begin(id, op))
session := handler.handlerCtx.Session
// Init key for tether
// #nosec: RSA keys should be at least 2048 bits
// Size is 512 because key validation is not performed - see GitHub #2849
privateKey, err := rsa.GenerateKey(rand.Reader, 512)
if err != nil {
return containers.NewCreateNotFound().WithPayload(&models.Error{Message: err.Error()})
}
privateKeyBlock := pem.Block{
Type: "RSA PRIVATE KEY",
Headers: nil,
Bytes: x509.MarshalPKCS1PrivateKey(privateKey),
}
m := &executor.ExecutorConfig{
ExecutorConfigCommon: executor.ExecutorConfigCommon{
ID: id,
Name: params.CreateConfig.Name,
},
CreateTime: time.Now().UTC().UnixNano(),
Version: version.GetBuild(),
Key: pem.EncodeToMemory(&privateKeyBlock),
Hostname: params.CreateConfig.Hostname,
Domainname: params.CreateConfig.Domainname,
}
if params.CreateConfig.Annotations != nil && len(params.CreateConfig.Annotations) > 0 {
m.Annotations = make(map[string]string)
for k, v := range params.CreateConfig.Annotations {
m.Annotations[k] = v
}
}
// Create the executor.ExecutorCreateConfig
c := &exec.ContainerCreateConfig{
Metadata: m,
Resources: exec.Resources{
NumCPUs: params.CreateConfig.NumCpus,
MemoryMB: params.CreateConfig.MemoryMB,
},
}
h, err := exec.Create(op, session, c)
if err != nil {
op.Errorf("ContainerCreate error: %s", err.Error())
return containers.NewCreateNotFound().WithPayload(&models.Error{Message: err.Error()})
}
// send the container id back to the caller
return containers.NewCreateOK().WithPayload(&models.ContainerCreatedInfo{ID: id, Handle: h.String()})
}
// StateChangeHandler changes the state of a container
func (handler *ContainersHandlersImpl) StateChangeHandler(params containers.StateChangeParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle)))
h := exec.GetHandle(params.Handle)
if h == nil {
return containers.NewStateChangeNotFound()
}
var state exec.State
switch params.State {
case "RUNNING":
state = exec.StateRunning
case "STOPPED":
state = exec.StateStopped
case "CREATED":
state = exec.StateCreated
default:
return containers.NewStateChangeDefault(http.StatusServiceUnavailable).WithPayload(&models.Error{Message: "unknown state"})
}
h.SetTargetState(state)
return containers.NewStateChangeOK().WithPayload(h.String())
}
func (handler *ContainersHandlersImpl) GetStateHandler(params containers.GetStateParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle)))
// NOTE: I've no idea why GetStateHandler takes a handle instead of an ID - hopefully there was a reason for an inspection
// operation to take this path
h := exec.GetHandle(params.Handle)
if h == nil || h.ExecConfig == nil {
return containers.NewGetStateNotFound()
}
container := exec.Containers.Container(h.ExecConfig.ID)
if container == nil {
return containers.NewGetStateNotFound()
}
var state string
switch container.CurrentState() {
case exec.StateRunning:
state = "RUNNING"
case exec.StateStopped:
state = "STOPPED"
case exec.StateCreated:
state = "CREATED"
default:
return containers.NewGetStateDefault(http.StatusServiceUnavailable)
}
return containers.NewGetStateOK().WithPayload(
&models.ContainerGetStateResponse{
Handle: h.String(),
State: state,
})
}
func (handler *ContainersHandlersImpl) GetHandler(params containers.GetParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
h := exec.GetContainer(context.Background(), uid.Parse(params.ID))
if h == nil {
return containers.NewGetNotFound().WithPayload(&models.Error{Message: fmt.Sprintf("container %s not found", params.ID)})
}
return containers.NewGetOK().WithPayload(h.String())
}
func (handler *ContainersHandlersImpl) CommitHandler(params containers.CommitParams) middleware.Responder {
op := trace.NewOperation(context.Background(), fmt.Sprintf("commit handle(%s)", params.Handle))
defer trace.End(trace.Begin(fmt.Sprintf("commit handle(%s)", params.Handle), op))
h := exec.GetHandle(params.Handle)
if h == nil {
return containers.NewCommitNotFound().WithPayload(&models.Error{Message: "container not found"})
}
if err := h.Commit(op, handler.handlerCtx.Session, params.WaitTime); err != nil {
op.Errorf("CommitHandler error on handle(%s) for %s: %s", h, h.ExecConfig.ID, err)
switch err := err.(type) {
case exec.ConcurrentAccessError:
return containers.NewCommitConflict().WithPayload(&models.Error{Message: err.Error()})
case exec.DevicesInUseError:
return containers.NewCommitConflict().WithPayload(&models.Error{Message: err.Error()})
default:
return containers.NewCommitDefault(http.StatusServiceUnavailable).WithPayload(&models.Error{Message: err.Error()})
}
}
return containers.NewCommitOK()
}
func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers.ContainerRemoveParams) middleware.Responder {
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))
// get the indicated container for removal
container := exec.Containers.Container(params.ID)
if container == nil {
return containers.NewContainerRemoveNotFound()
}
// NOTE: this should allowing batching of operations, as with Create, Start, Stop, et al
err := container.Remove(op, handler.handlerCtx.Session)
if err != nil {
switch err := err.(type) {
case exec.NotFoundError:
return containers.NewContainerRemoveNotFound()
case exec.RemovePowerError:
return containers.NewContainerRemoveConflict().WithPayload(&models.Error{Message: err.Error()})
default:
if f, ok := err.(types.HasFault); ok {
switch f.Fault().(type) {
case *types.HostNotConnected:
p := &models.Error{Message: "Couldn't remove container. The ESX host is temporarily disconnected. Please try again later."}
return containers.NewContainerRemoveInternalServerError().WithPayload(p)
}
}
return containers.NewContainerRemoveInternalServerError()
}
}
return containers.NewContainerRemoveOK()
}
func (handler *ContainersHandlersImpl) GetContainerInfoHandler(params containers.GetContainerInfoParams) middleware.Responder {
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))
container := exec.Containers.Container(params.ID)
if container == nil {
info := fmt.Sprintf("GetContainerInfoHandler ContainerCache miss for container(%s)", params.ID)
op.Errorf(info)
return containers.NewGetContainerInfoNotFound().WithPayload(&models.Error{Message: info})
}
// Refresh to get up to date network info
container.Refresh(op)
containerInfo := convertContainerToContainerInfo(container)
return containers.NewGetContainerInfoOK().WithPayload(containerInfo)
}
// type and funcs to provide sorting by created date
type containerByCreated []*models.ContainerInfo
func (r containerByCreated) Len() int { return len(r) }
func (r containerByCreated) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r containerByCreated) Less(i, j int) bool {
return r[i].ContainerConfig.CreateTime < r[j].ContainerConfig.CreateTime
}
func (handler *ContainersHandlersImpl) GetContainerListHandler(params containers.GetContainerListParams) middleware.Responder {
defer trace.End(trace.Begin(""))
var states []exec.State
var include func(*models.ContainerInfo) bool
if params.All != nil && !*params.All {
// we include Starting in the query as it's transient but will filter out those that don't transition to running
// before returning.
// TODO: this is here solely due to the lack of a structured means of queuing a background refresh and should be
// eliminated as soon as that's available. If we don't do this at this point in time then the caller must look at
// all containers or inspect the Starting one directly to trigger a refresh
states = append(states, exec.StateRunning, exec.StateStarting)
include = func(info *models.ContainerInfo) bool {
return info.ContainerConfig.State == exec.StateRunning.String()
}
}
containerVMs := exec.Containers.Containers(states)
containerList := make([]*models.ContainerInfo, 0, len(containerVMs))
for _, container := range containerVMs {
// convert to return model
info := convertContainerToContainerInfo(container)
if include == nil || include(info) {
containerList = append(containerList, info)
}
}
sort.Sort(sort.Reverse(containerByCreated(containerList)))
return containers.NewGetContainerListOK().WithPayload(containerList)
}
func (handler *ContainersHandlersImpl) ContainerSignalHandler(params containers.ContainerSignalParams) middleware.Responder {
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))
// NOTE: I feel that this should be in a Commit path for consistency
// it would allow phrasings such as:
// 1. join Volume to container
// 2. send HUP to primary process
// Only really relevant when we can connect networks or join volumes live
container := exec.Containers.Container(params.ID)
if container == nil {
return containers.NewContainerSignalNotFound().WithPayload(&models.Error{Message: fmt.Sprintf("container %s not found", params.ID)})
}
err := container.Signal(op, params.Signal)
if err != nil {
return containers.NewContainerSignalInternalServerError().WithPayload(&models.Error{Message: err.Error()})
}
return containers.NewContainerSignalOK()
}
func (handler *ContainersHandlersImpl) GetContainerStatsHandler(params containers.GetContainerStatsParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
c := exec.Containers.Container(params.ID)
if c == nil {
return containers.NewGetContainerStatsNotFound()
}
r, w := io.Pipe()
enc := json.NewEncoder(w)
flusher := NewFlushingReader(r)
// operation that will log the stats subscription for this client
statsOp := trace.NewOperation(context.Background(), "container(%s) stats subscription", params.ID)
// currently all stats requests will be a subscription and it will
// be the responsibility of the caller to close the connection
// and there by release the subscription
ch, err := metrics.Supervisor.Subscribe(statsOp, c)
if err != nil {
statsOp.Errorf("unable to subscribe container(%s) to stats stream: %s", params.ID, err)
return containers.NewGetContainerStatsInternalServerError()
}
statsOp.Debugf("container(%s) stats stream subscribed", params.ID)
// closer will be run when the http transport is closed
cleaner := func() {
statsOp.Debugf("unsubscribing %s from stats", params.ID)
metrics.Supervisor.Unsubscribe(statsOp, c, ch)
closePipe(r, w)
}
// routine that will listen for new metrics and encode to provided output stream
// unsubscription or error will exit the routine
go func() {
for {
select {
case metric, ok := <-ch:
if !ok {
statsOp.Debugf("container stats complete for %s", params.ID)
return
}
err := enc.Encode(metric)
if err != nil {
statsOp.Errorf("encoding error [%s] for container(%s) stats - stream(%t)", err, params.ID, params.Stream)
return
}
}
}
}()
return NewStreamOutputHandler("containerStats").WithPayload(flusher, params.ID, cleaner)
}
func (handler *ContainersHandlersImpl) GetContainerLogsHandler(params containers.GetContainerLogsParams) middleware.Responder {
op := trace.NewOperation(context.Background(), "Getting logs for %s", params.ID)
defer trace.End(trace.Begin(params.ID, op))
container := exec.Containers.Container(params.ID)
if container == nil {
return containers.NewGetContainerLogsNotFound()
}
follow := false
tail := -1
since := int64(0)
if params.Follow != nil {
follow = *params.Follow
}
if params.Taillines != nil {
tail = int(*params.Taillines)
}
if *params.Since > 0 {
since = *params.Since
}
reader, err := container.LogReader(op, tail, follow, since)
if err != nil {
// Do not return an error here. It's a workaround for a panic similar to #2594
return containers.NewGetContainerLogsInternalServerError()
}
// containers with DataVersion > 0 will use updated output logging on the backend
if container.DataVersion > 0 {
ts := false
if params.Timestamp != nil {
ts = *params.Timestamp
}
// wrap the reader in a LogReader to deserialize persisted containerVM output
reader = iolog.NewLogReader(reader, ts)
}
detachableOut := NewFlushingReader(reader)
return NewStreamOutputHandler("containerLogs").WithPayload(detachableOut, params.ID, nil)
}
func (handler *ContainersHandlersImpl) ContainerWaitHandler(params containers.ContainerWaitParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("%s:%d", params.ID, params.Timeout)))
// default context timeout in seconds
defaultTimeout := int64(containerWaitTimeout.Seconds())
// if we have a positive timeout specified then use it
if params.Timeout > 0 {
defaultTimeout = params.Timeout
}
timeout := time.Duration(defaultTimeout) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c := exec.Containers.Container(uid.Parse(params.ID).String())
if c == nil {
return containers.NewContainerWaitNotFound().WithPayload(&models.Error{
Message: fmt.Sprintf("container %s not found", params.ID),
})
}
select {
case <-c.WaitForState(exec.StateStopped):
containerInfo := convertContainerToContainerInfo(c)
return containers.NewContainerWaitOK().WithPayload(containerInfo)
case <-ctx.Done():
return containers.NewContainerWaitInternalServerError().WithPayload(&models.Error{
Message: fmt.Sprintf("ContainerWaitHandler(%s) Error: %s", params.ID, ctx.Err()),
})
}
}
func (handler *ContainersHandlersImpl) RenameContainerHandler(params containers.ContainerRenameParams) middleware.Responder {
op := trace.NewOperation(context.Background(), "Rename container to %s", params.Name)
h := exec.GetHandle(params.Handle)
if h == nil || h.ExecConfig == nil {
return containers.NewContainerRenameNotFound()
}
defer trace.End(trace.Begin(h.ExecConfig.ID, op))
// get the indicated container for rename
container := exec.Containers.Container(h.ExecConfig.ID)
if container == nil {
return containers.NewContainerRenameNotFound()
}
if container.ExecConfig.Name == params.Name {
err := &models.Error{
Message: fmt.Sprintf("renaming a container with the same name as its current name: %s", params.Name),
}
return containers.NewContainerRenameInternalServerError().WithPayload(err)
}
// rename on container version < supportVersionForRename is not supported
log.Debugf("The container DataVersion is: %d", h.DataVersion)
if h.DataVersion < feature.RenameSupportedVersion {
err := &models.Error{
Message: fmt.Sprintf("container %s does not support rename", container.ExecConfig.Name),
}
return containers.NewContainerRenameInternalServerError().WithPayload(err)
}
h = h.Rename(op, params.Name)
return containers.NewContainerRenameOK().WithPayload(h.String())
}
// utility function to convert from a Container type to the API Model ContainerInfo (which should prob be called ContainerDetail)
func convertContainerToContainerInfo(c *exec.Container) *models.ContainerInfo {
container := c.Info()
// ensure we have probably up-to-date info. Determine if we have transient state values
transient := false
if container.State() == exec.StateStarting || container.State() == exec.StateStopping {
transient = true
}
if container.State() != exec.StateStopped {
for _, endpoint := range container.ExecConfig.Networks {
if !endpoint.Static && ip.IsUnspecifiedIP(endpoint.Assigned.IP) {
// container has dynamic IP but we do not have a reported address
// shouldn't need multiple refreshes if multiple dhcps
transient = true
break
}
}
}
if transient {
op := trace.NewOperation(context.Background(), "state refresh triggered by a transient data state")
c.Refresh(op)
container = c.Info()
}
// convert the container type to the required model
info := &models.ContainerInfo{
ContainerConfig: &models.ContainerConfig{},
ProcessConfig: &models.ProcessConfig{},
VolumeConfig: make([]*models.VolumeConfig, 0),
Endpoints: make([]*models.EndpointConfig, 0),
DataVersion: int64(container.DataVersion),
}
// Populate volume information
for volName, mountSpec := range container.ExecConfig.Mounts {
vol := &models.VolumeConfig{
Name: volName,
MountPoint: mountSpec.Path,
ReadWrite: strings.Contains(strings.ToLower(mountSpec.Mode), "rw"),
Flags: make(map[string]string),
}
vol.Flags[constants.Mode] = mountSpec.Mode
info.VolumeConfig = append(info.VolumeConfig, vol)
}
ccid := container.ExecConfig.ID
info.ContainerConfig.ContainerID = ccid
state := container.State().String()
if container.MigrationError != nil {
state = "error"
info.ProcessConfig.ErrorMsg = fmt.Sprintf("Migration failed: %s", container.MigrationError.Error())
info.ProcessConfig.Status = state
}
info.ContainerConfig.State = state
info.ContainerConfig.LayerID = container.ExecConfig.LayerID
info.ContainerConfig.ImageID = container.ExecConfig.ImageID
info.ContainerConfig.RepoName = &container.ExecConfig.RepoName
info.ContainerConfig.CreateTime = container.ExecConfig.CreateTime
info.ContainerConfig.Names = []string{container.ExecConfig.Name}
info.ContainerConfig.RestartCount = int64(container.ExecConfig.Diagnostics.ResurrectionCount)
info.ContainerConfig.StorageSize = container.VMUnsharedDisk
if container.ExecConfig.Annotations != nil && len(container.ExecConfig.Annotations) > 0 {
info.ContainerConfig.Annotations = make(map[string]string)
for k, v := range container.ExecConfig.Annotations {
info.ContainerConfig.Annotations[k] = v
}
}
// in heavily loaded environments we were seeing a panic due to a missing
// session id in execConfig -- this has only manifested itself in short lived containers
// that were initilized via run
if session, exists := container.ExecConfig.Sessions[ccid]; exists {
info.ContainerConfig.Tty = &session.Tty
info.ContainerConfig.AttachStdin = &session.Attach
info.ContainerConfig.AttachStdout = &session.Attach
info.ContainerConfig.AttachStderr = &session.Attach
info.ContainerConfig.OpenStdin = &session.OpenStdin
// started is a string in the vmx that is not to be confused
// with started the datetime in the models.ContainerInfo
info.ProcessConfig.Status = session.Started
info.ProcessConfig.ExecPath = session.Cmd.Path
info.ProcessConfig.WorkingDir = &session.Cmd.Dir
info.ProcessConfig.ExecArgs = session.Cmd.Args
info.ProcessConfig.Env = session.Cmd.Env
info.ProcessConfig.ExitCode = int32(session.ExitStatus)
info.ProcessConfig.StartTime = session.StartTime
info.ProcessConfig.StopTime = session.StopTime
info.ProcessConfig.User = session.User
if session.Group != "" {
info.ProcessConfig.User = fmt.Sprintf("%s:%s", session.User, session.Group)
}
} else {
// log that sessionID is missing and print the ExecConfig
log.Errorf("Session ID is missing from execConfig, change version %s: %#v", c.Config.ChangeVersion, container.ExecConfig)
// panic if we are in debug / hopefully CI
if log.DebugLevel > 0 {
panic("nil session id")
}
}
info.HostConfig = &models.HostConfig{}
for _, endpoint := range container.ExecConfig.Networks {
ep := &models.EndpointConfig{
Address: "",
Container: ccid,
Gateway: "",
ID: endpoint.ID,
Name: endpoint.Name,
Ports: make([]string, 0),
Scope: endpoint.Network.Name,
Aliases: make([]string, 0),
Nameservers: make([]string, 0),
Trust: endpoint.Network.TrustLevel.String(),
Direct: endpoint.Network.Type == constants.ExternalScopeType,
}
if !ip.IsUnspecifiedIP(endpoint.Network.Gateway.IP) {
ep.Gateway = endpoint.Network.Gateway.String()
}
if !ip.IsUnspecifiedIP(endpoint.Assigned.IP) {
ep.Address = endpoint.Assigned.String()
}
if len(endpoint.Ports) > 0 {
ep.Ports = append(ep.Ports, endpoint.Ports...)
}
for _, alias := range endpoint.Network.Aliases {
parts := strings.Split(alias, ":")
if len(parts) > 1 {
ep.Aliases = append(ep.Aliases, parts[1])
} else {
ep.Aliases = append(ep.Aliases, parts[0])
}
}
for _, dns := range endpoint.Network.Nameservers {
ep.Nameservers = append(ep.Nameservers, dns.String())
}
info.Endpoints = append(info.Endpoints, ep)
}
return info
}

View File

@@ -1,76 +0,0 @@
// Copyright 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"encoding/json"
"fmt"
"io"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/events"
ple "github.com/vmware/vic/lib/portlayer/event/events"
"github.com/vmware/vic/lib/portlayer/exec"
"github.com/vmware/vic/pkg/trace"
"github.com/vmware/vic/pkg/uid"
)
// EventHandlerImpl is the receiver for all of the event handler methods
type EventsHandlerImpl struct {
handlerCtx *HandlerContext
}
// Configure assigns functions to all the exec api handlers
func (handler *EventsHandlerImpl) Configure(api *operations.PortLayerAPI, handlerCtx *HandlerContext) {
api.EventsGetEventsHandler = events.GetEventsHandlerFunc(handler.GetEventsHandler)
handler.handlerCtx = handlerCtx
}
// GetEventsHandler provides a stream of events
func (handler *EventsHandlerImpl) GetEventsHandler(params events.GetEventsParams) middleware.Responder {
defer trace.End(trace.Begin(""))
r, w := io.Pipe()
enc := json.NewEncoder(w)
flusher := NewFlushingReader(r)
// uid for subscription
id := uid.New().String()
sub := fmt.Sprintf("%s-%s", "PLE", id)
// currently only containerEvents will be streamed
topic := ple.NewEventType(ple.ContainerEvent{}).Topic()
// func to clean up the event stream
onClose := func() {
exec.Config.EventManager.Unsubscribe(topic, sub)
closePipe(r, w)
}
// subscribe to event stream
exec.Config.EventManager.Subscribe(topic, sub, func(ie ple.Event) {
err := enc.Encode(ie)
if err != nil {
log.Errorf("Encoding Error: %s", err.Error())
exec.Config.EventManager.Unsubscribe(topic, sub)
closePipe(r, w)
}
})
return NewStreamOutputHandler("events").WithPayload(flusher, sub, onClose)
}

View File

@@ -1,22 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import "github.com/vmware/vic/pkg/vsphere/session"
// HandlerContext is set of shared objects for the port layer server handlers
type HandlerContext struct {
Session *session.Session
}

View File

@@ -1,317 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"fmt"
"io"
"time"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/interaction"
"github.com/vmware/vic/lib/constants"
"github.com/vmware/vic/lib/portlayer/attach"
"github.com/vmware/vic/lib/portlayer/attach/communication"
"github.com/vmware/vic/lib/portlayer/exec"
"github.com/vmware/vic/pkg/trace"
)
// InteractionHandlersImpl is the receiver for all of the interaction handler methods
type InteractionHandlersImpl struct {
server *communication.Server
}
const (
// bump to 50 seconds for 30s is not enough for slow environment
interactionTimeout time.Duration = 50 * time.Second
attachStdinInitString = "v1c#>"
// in sync with lib/tether/tether_linux.go
// 115200 bps is 14.4 KB/s so use that
ioCopyBufferSize = 14 * 1024
)
func (i *InteractionHandlersImpl) Configure(api *operations.PortLayerAPI, _ *HandlerContext) {
api.InteractionInteractionJoinHandler = interaction.InteractionJoinHandlerFunc(i.JoinHandler)
api.InteractionInteractionBindHandler = interaction.InteractionBindHandlerFunc(i.BindHandler)
api.InteractionInteractionUnbindHandler = interaction.InteractionUnbindHandlerFunc(i.UnbindHandler)
api.InteractionContainerResizeHandler = interaction.ContainerResizeHandlerFunc(i.ContainerResizeHandler)
api.InteractionContainerSetStdinHandler = interaction.ContainerSetStdinHandlerFunc(i.ContainerSetStdinHandler)
api.InteractionContainerGetStdoutHandler = interaction.ContainerGetStdoutHandlerFunc(i.ContainerGetStdoutHandler)
api.InteractionContainerGetStderrHandler = interaction.ContainerGetStderrHandlerFunc(i.ContainerGetStderrHandler)
api.InteractionContainerCloseStdinHandler = interaction.ContainerCloseStdinHandlerFunc(i.ContainerCloseStdinHandler)
i.server = communication.NewServer("localhost", constants.AttachServerPort)
if err := i.server.Start(); err != nil {
log.Fatalf("Attach server unable to start: %s", err)
}
}
// JoinHandler calls the Join
func (i *InteractionHandlersImpl) JoinHandler(params interaction.InteractionJoinParams) middleware.Responder {
defer trace.End(trace.Begin(""))
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return interaction.NewInteractionJoinInternalServerError().WithPayload(err)
}
handleprime, err := attach.Join(handle)
if err != nil {
log.Errorf("%s", err.Error())
return interaction.NewInteractionJoinInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.InteractionJoinResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return interaction.NewInteractionJoinOK().WithPayload(res)
}
// BindHandler calls the Bind
func (i *InteractionHandlersImpl) BindHandler(params interaction.InteractionBindParams) middleware.Responder {
defer trace.End(trace.Begin(""))
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return interaction.NewInteractionBindInternalServerError().WithPayload(err)
}
handleprime, err := attach.Bind(handle, params.Config.ID)
if err != nil {
log.Errorf("%s", err.Error())
return interaction.NewInteractionBindInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.InteractionBindResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return interaction.NewInteractionBindOK().WithPayload(res)
}
// UnbindHandler calls the Unbind
func (i *InteractionHandlersImpl) UnbindHandler(params interaction.InteractionUnbindParams) middleware.Responder {
defer trace.End(trace.Begin(""))
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return interaction.NewInteractionUnbindInternalServerError().WithPayload(err)
}
handleprime, err := attach.Unbind(handle, params.Config.ID)
if err != nil {
log.Errorf("%s", err.Error())
return interaction.NewInteractionUnbindInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.InteractionUnbindResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return interaction.NewInteractionUnbindOK().WithPayload(res)
}
// ContainerResizeHandler calls resize
func (i *InteractionHandlersImpl) ContainerResizeHandler(params interaction.ContainerResizeParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()
// See whether there is an active session to the container
session, err := i.server.Interaction(ctx, params.ID)
if err != nil {
// just note the warning and return, resize requires an active connection
log.Warnf("No resize connection found (id: %s): %s", params.ID, err)
return interaction.NewContainerResizeOK()
}
// Request a resize
cWidth := uint32(params.Width)
cHeight := uint32(params.Height)
if err = session.Resize(cWidth, cHeight, 0, 0); err != nil {
log.Errorf("%s", err.Error())
return interaction.NewContainerResizeInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
return interaction.NewContainerResizeOK()
}
// ContainerSetStdinHandler returns the stdin
func (i *InteractionHandlersImpl) ContainerSetStdinHandler(params interaction.ContainerSetStdinParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
ctx, cancel := context.WithTimeout(context.Background(), interactionTimeout)
if params.Deadline != nil {
ctx, cancel = context.WithDeadline(context.Background(), time.Time(*params.Deadline))
}
defer cancel()
session, err := i.server.Interaction(ctx, params.ID)
if err != nil {
log.Errorf("%s", err.Error())
e := &models.Error{
Message: fmt.Sprintf("No stdin connection found (id: %s): %s", params.ID, err.Error()),
}
return interaction.NewContainerSetStdinNotFound().WithPayload(e)
}
detachableIn := NewFlushingReaderWithInitBytes(params.RawStream, []byte(attachStdinInitString))
_, err = io.Copy(session.Stdin(), detachableIn)
if err != nil {
log.Errorf("Copy@ContainerSetStdinHandler returned %s", err.Error())
/*
// FIXME(caglar10ur): need a way to differentiate detach from pipe
// Close the stdin if we get an EOF in the middle of the stream
if err == io.ErrUnexpectedEOF {
if err = session.CloseStdin(); err != nil {
log.Errorf("CloseStdin@ContainerSetStdinHandler failed with %s", err.Error())
} else {
log.Infof("CloseStdin@ContainerSetStdinHandler succeeded")
}
}
// FIXME(caglar10ur): Do not return an error here - https://github.com/vmware/vic/issues/2594
e := &models.Error{
Message: fmt.Sprintf("Error copying stdin (id: %s): %s", params.ID, err.Error()),
}
return interaction.NewContainerSetStdinInternalServerError().WithPayload(e)
*/
}
// close the stream
params.RawStream.Close()
log.Debugf("Done copying stdin")
return interaction.NewContainerSetStdinOK()
}
// ContainerCloseStdinHandler closes the stdin, it returns an error if there is no active connection between portlayer and the tether
func (i *InteractionHandlersImpl) ContainerCloseStdinHandler(params interaction.ContainerCloseStdinParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
ctx, cancel := context.WithTimeout(context.Background(), interactionTimeout)
defer cancel()
session, err := i.server.Interaction(ctx, params.ID)
if err != nil {
log.Errorf("%s", err.Error())
e := &models.Error{
Message: fmt.Sprintf("No stdin connection found (id: %s): %s", params.ID, err.Error()),
}
return interaction.NewContainerCloseStdinNotFound().WithPayload(e)
}
if err = session.CloseStdin(); err != nil {
log.Errorf("%s", err.Error())
return interaction.NewContainerCloseStdinInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
return interaction.NewContainerCloseStdinOK()
}
// ContainerGetStdoutHandler returns the stdout
func (i *InteractionHandlersImpl) ContainerGetStdoutHandler(params interaction.ContainerGetStdoutParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
ctx, cancel := context.WithTimeout(context.Background(), interactionTimeout)
if params.Deadline != nil {
ctx, cancel = context.WithDeadline(context.Background(), time.Time(*params.Deadline))
}
defer cancel()
session, err := i.server.Interaction(ctx, params.ID)
if err != nil {
log.Errorf("%s", err.Error())
// FIXME (caglar10ur): Do not return an error here - https://github.com/vmware/vic/issues/2594
/*
e := &models.Error{
Message: fmt.Sprintf("No stdout connection found (id: %s): %s", params.ID, err.Error()),
}
return interaction.NewContainerGetStdoutNotFound().WithPayload(e)
*/
return interaction.NewContainerGetStdoutNotFound()
}
return NewStreamOutputHandler("stdout").WithPayload(
NewFlushingReader(
session.Stdout(),
),
params.ID,
nil,
)
}
// ContainerGetStderrHandler returns the stderr
func (i *InteractionHandlersImpl) ContainerGetStderrHandler(params interaction.ContainerGetStderrParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
ctx, cancel := context.WithTimeout(context.Background(), interactionTimeout)
if params.Deadline != nil {
ctx, cancel = context.WithDeadline(context.Background(), time.Time(*params.Deadline))
}
defer cancel()
session, err := i.server.Interaction(ctx, params.ID)
if err != nil {
log.Errorf("%s", err.Error())
// FIXME (caglar10ur): Do not return an error here - https://github.com/vmware/vic/issues/2594
/*
e := &models.Error{
Message: fmt.Sprintf("No stderr connection found (id: %s): %s", params.ID, err.Error()),
}
return interaction.NewContainerGetStderrNotFound().WithPayload(e)
*/
return interaction.NewContainerGetStderrNotFound()
}
return NewStreamOutputHandler("stderr").WithPayload(
NewFlushingReader(
session.Stderr(),
),
params.ID,
nil,
)
}

View File

@@ -1,84 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"io"
"strings"
"testing"
log "github.com/Sirupsen/logrus"
)
func init() {
log.SetLevel(log.DebugLevel)
}
// A ChunkReader reads C bytes from R in each Read
type ChunkReader struct {
R io.Reader
C int64
}
func NewChunkReader(r io.Reader, c int64) io.Reader {
return &ChunkReader{r, c}
}
func (l *ChunkReader) Read(p []byte) (n int, err error) {
return l.R.Read(p[:l.C])
}
func TestNewFlushingReaderWithInitBytes(t *testing.T) {
var tests = []struct {
in string
err error
}{
{attachStdinInitString, nil},
{attachStdinInitString + "# uname -a", nil},
{"@^" + attachStdinInitString + "# uname -a", nil},
{attachStdinInitString[:2], io.EOF},
}
for _, test := range tests {
for i := 1; i < len(test.in)+1; i++ {
buf := make([]byte, 64)
lr := NewChunkReader(strings.NewReader(test.in), int64(i))
f := NewFlushingReaderWithInitBytes(lr, []byte(attachStdinInitString))
_, err := f.readDetectInit(buf)
if err != test.err {
t.Error(err)
}
}
}
}
func TestNewFlushingReaderWithInitBytesPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The function did not panic")
}
}()
// pass a smaller buffer to cause it to panic
str := attachStdinInitString
var buf []byte
lr := NewChunkReader(strings.NewReader(str), 1)
f := NewFlushingReaderWithInitBytes(lr, []byte(attachStdinInitString))
f.readDetectInit(buf)
}

View File

@@ -1,101 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"net/http"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/kv"
"github.com/vmware/vic/lib/portlayer/store"
"github.com/vmware/vic/pkg/kvstore"
"github.com/vmware/vic/pkg/trace"
)
type KvHandlersImpl struct {
defaultStore kvstore.KeyValueStore
}
func (handler *KvHandlersImpl) Configure(api *operations.PortLayerAPI, handlerCtx *HandlerContext) {
api.KvGetValueHandler = kv.GetValueHandlerFunc(handler.GetValueHandler)
api.KvPutValueHandler = kv.PutValueHandlerFunc(handler.PutValueHandler)
api.KvDeleteValueHandler = kv.DeleteValueHandlerFunc(handler.DeleteValueHandler)
// Get the APIKV store -- it should always be present since it's
// initialized when the portlayer starts
// #nosec: Errors unhandled.
s, _ := store.Store(store.APIKV)
handler.defaultStore = s
}
func (handler *KvHandlersImpl) GetValueHandler(params kv.GetValueParams) middleware.Responder {
defer trace.End(trace.Begin(params.Key))
val, err := handler.defaultStore.Get(params.Key)
if err != nil {
switch err {
case kvstore.ErrKeyNotFound:
return kv.NewGetValueNotFound()
default:
log.Errorf("Error Getting Key/Value: %s", err.Error())
return kv.NewGetValueInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
}
return kv.NewGetValueOK().WithPayload(&models.KeyValue{Key: params.Key, Value: string(val)})
}
func (handler *KvHandlersImpl) PutValueHandler(params kv.PutValueParams) middleware.Responder {
defer trace.End(trace.Begin(params.KeyValue.Key))
err := handler.defaultStore.Put(
context.Background(),
params.KeyValue.Key,
[]byte(params.KeyValue.Value))
if err != nil {
log.Errorf("Error Setting Key/Value: %s", err.Error())
return kv.NewGetValueInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
return kv.NewPutValueOK()
}
func (handler *KvHandlersImpl) DeleteValueHandler(params kv.DeleteValueParams) middleware.Responder {
defer trace.End(trace.Begin(params.Key))
err := handler.defaultStore.Delete(trace.NewOperation(context.Background(), "DeleteValue"), params.Key)
if err != nil {
switch err {
case kvstore.ErrKeyNotFound:
return kv.NewDeleteValueNotFound()
default:
log.Errorf("Error deleting Key/Value: %s", err.Error())
return kv.NewGetValueInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
}
return kv.NewDeleteValueOK()
}

View File

@@ -1,106 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/logging"
"github.com/vmware/vic/lib/portlayer/exec"
portlayer "github.com/vmware/vic/lib/portlayer/logging"
"github.com/vmware/vic/pkg/trace"
)
// LoggingHandlersImpl is the receiver for all of the logging handler methods
type LoggingHandlersImpl struct {
}
// Configure initializes the handler
func (i *LoggingHandlersImpl) Configure(api *operations.PortLayerAPI, _ *HandlerContext) {
api.LoggingLoggingJoinHandler = logging.LoggingJoinHandlerFunc(i.JoinHandler)
api.LoggingLoggingBindHandler = logging.LoggingBindHandlerFunc(i.BindHandler)
api.LoggingLoggingUnbindHandler = logging.LoggingUnbindHandlerFunc(i.UnbindHandler)
}
// JoinHandler calls the Join
func (i *LoggingHandlersImpl) JoinHandler(params logging.LoggingJoinParams) middleware.Responder {
defer trace.End(trace.Begin(""))
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return logging.NewLoggingJoinInternalServerError().WithPayload(err)
}
handleprime, err := portlayer.Join(handle)
if err != nil {
return logging.NewLoggingJoinInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.LoggingJoinResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return logging.NewLoggingJoinOK().WithPayload(res)
}
// BindHandler calls the Bind
func (i *LoggingHandlersImpl) BindHandler(params logging.LoggingBindParams) middleware.Responder {
defer trace.End(trace.Begin(""))
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return logging.NewLoggingBindInternalServerError().WithPayload(err)
}
handleprime, err := portlayer.Bind(handle)
if err != nil {
return logging.NewLoggingBindInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.LoggingBindResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return logging.NewLoggingBindOK().WithPayload(res)
}
// UnbindHandler calls the Unbind
func (i *LoggingHandlersImpl) UnbindHandler(params logging.LoggingUnbindParams) middleware.Responder {
defer trace.End(trace.Begin(""))
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return logging.NewLoggingUnbindInternalServerError().WithPayload(err)
}
handleprime, err := portlayer.Unbind(handle)
if err != nil {
return logging.NewLoggingUnbindInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.LoggingUnbindResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return logging.NewLoggingUnbindOK().WithPayload(res)
}

View File

@@ -1,58 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/misc"
"github.com/vmware/vic/lib/portlayer/exec"
)
// MiscHandlersImpl is the receiver for all the misc handler methods
type MiscHandlersImpl struct{}
// Configure assigns functions to all the miscellaneous api handlers
func (h *MiscHandlersImpl) Configure(api *operations.PortLayerAPI, handlerCtx *HandlerContext) {
api.MiscPingHandler = misc.PingHandlerFunc(h.Ping)
api.MiscGetVCHInfoHandler = misc.GetVCHInfoHandlerFunc(h.GetVCHInfo)
}
// Ping sends an OK response to let the client know the server is up
func (h *MiscHandlersImpl) Ping(param misc.PingParams) middleware.Responder {
return misc.NewPingOK().WithPayload("OK")
}
// GetVCHInfo returns VCH-related info for a `docker info` call
func (h *MiscHandlersImpl) GetVCHInfo(params misc.GetVCHInfoParams) middleware.Responder {
vch := exec.GetVCHstats(context.Background())
vchInfo := &models.VCHInfo{
CPUMhz: vch.CPULimit,
Memory: vch.MemoryLimit,
CPUUsage: vch.CPUUsage,
MemUsage: vch.MemoryUsage,
HostOS: exec.Config.HostOS,
HostOSVersion: exec.Config.HostOSVersion,
HostProductName: exec.Config.HostProductName,
}
return misc.NewGetVCHInfoOK().WithPayload(vchInfo)
}

View File

@@ -1,401 +0,0 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"fmt"
"net"
"net/http"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/scopes"
"github.com/vmware/vic/lib/portlayer/exec"
"github.com/vmware/vic/lib/portlayer/network"
"github.com/vmware/vic/pkg/ip"
"github.com/vmware/vic/pkg/trace"
)
// ScopesHandlersImpl is the receiver for all of the storage handler methods
type ScopesHandlersImpl struct {
netCtx *network.Context
handlerCtx *HandlerContext
}
// Configure assigns functions to all the scopes api handlers
func (handler *ScopesHandlersImpl) Configure(api *operations.PortLayerAPI, handlerCtx *HandlerContext) {
api.ScopesCreateScopeHandler = scopes.CreateScopeHandlerFunc(handler.ScopesCreate)
api.ScopesDeleteScopeHandler = scopes.DeleteScopeHandlerFunc(handler.ScopesDelete)
api.ScopesListAllHandler = scopes.ListAllHandlerFunc(handler.ScopesListAll)
api.ScopesListHandler = scopes.ListHandlerFunc(handler.ScopesList)
api.ScopesGetContainerEndpointsHandler = scopes.GetContainerEndpointsHandlerFunc(handler.ScopesGetContainerEndpoints)
api.ScopesAddContainerHandler = scopes.AddContainerHandlerFunc(handler.ScopesAddContainer)
api.ScopesRemoveContainerHandler = scopes.RemoveContainerHandlerFunc(handler.ScopesRemoveContainer)
api.ScopesBindContainerHandler = scopes.BindContainerHandlerFunc(handler.ScopesBindContainer)
api.ScopesUnbindContainerHandler = scopes.UnbindContainerHandlerFunc(handler.ScopesUnbindContainer)
handler.netCtx = network.DefaultContext
handler.handlerCtx = handlerCtx
}
func parseScopeConfig(cfg *models.ScopeConfig) (subnet *net.IPNet, gateway net.IP, dns []net.IP, annotations map[string]string, err error) {
if cfg.Subnet != "" {
if _, subnet, err = net.ParseCIDR(cfg.Subnet); err != nil {
return
}
}
gateway = net.IPv4(0, 0, 0, 0)
if cfg.Gateway != "" {
if gateway = net.ParseIP(cfg.Gateway); gateway == nil {
err = fmt.Errorf("invalid gateway")
return
}
}
dns = make([]net.IP, len(cfg.DNS))
for i, d := range cfg.DNS {
dns[i] = net.ParseIP(d)
if dns[i] == nil {
err = fmt.Errorf("invalid dns entry")
return
}
}
// Parse annotations
if len(cfg.Annotations) > 0 {
annotations = make(map[string]string)
for k, v := range cfg.Annotations {
annotations[k] = v
}
}
return
}
func (handler *ScopesHandlersImpl) listScopes(idName string) ([]*models.ScopeConfig, error) {
defer trace.End(trace.Begin(idName))
scs, err := handler.netCtx.Scopes(context.Background(), &idName)
if err != nil {
return nil, err
}
cfgs := make([]*models.ScopeConfig, len(scs))
for i, s := range scs {
cfgs[i] = toScopeConfig(s)
}
return cfgs, nil
}
func errorPayload(err error) *models.Error {
return &models.Error{Message: err.Error()}
}
func (handler *ScopesHandlersImpl) ScopesCreate(params scopes.CreateScopeParams) middleware.Responder {
defer trace.End(trace.Begin(""))
cfg := params.Config
if cfg.ScopeType == "external" {
return scopes.NewCreateScopeDefault(http.StatusServiceUnavailable).WithPayload(
&models.Error{Message: "cannot create external networks"})
}
subnet, gateway, dns, annotations, err := parseScopeConfig(cfg)
if err != nil {
return scopes.NewCreateScopeDefault(http.StatusServiceUnavailable).WithPayload(
errorPayload(err))
}
scopeData := &network.ScopeData{
ScopeType: cfg.ScopeType,
Name: cfg.Name,
Subnet: subnet,
Gateway: gateway,
DNS: dns,
Pools: cfg.IPAM,
Annotations: annotations,
Internal: cfg.Internal,
}
s, err := handler.netCtx.NewScope(context.Background(), scopeData)
if _, ok := err.(network.DuplicateResourceError); ok {
return scopes.NewCreateScopeConflict()
}
if err != nil {
return scopes.NewCreateScopeDefault(http.StatusServiceUnavailable).WithPayload(
errorPayload(err))
}
return scopes.NewCreateScopeCreated().WithPayload(toScopeConfig(s))
}
func (handler *ScopesHandlersImpl) ScopesDelete(params scopes.DeleteScopeParams) middleware.Responder {
defer trace.End(trace.Begin(params.IDName))
if err := handler.netCtx.DeleteScope(context.Background(), params.IDName); err != nil {
switch err := err.(type) {
case network.ResourceNotFoundError:
return scopes.NewDeleteScopeNotFound().WithPayload(errorPayload(err))
default:
return scopes.NewDeleteScopeInternalServerError().WithPayload(errorPayload(err))
}
}
return scopes.NewDeleteScopeOK()
}
func (handler *ScopesHandlersImpl) ScopesListAll(params scopes.ListAllParams) middleware.Responder {
defer trace.End(trace.Begin(""))
cfgs, err := handler.listScopes("")
if err != nil {
return scopes.NewListDefault(http.StatusServiceUnavailable).WithPayload(errorPayload(err))
}
return scopes.NewListAllOK().WithPayload(cfgs)
}
func (handler *ScopesHandlersImpl) ScopesList(params scopes.ListParams) middleware.Responder {
defer trace.End(trace.Begin("ScopesList"))
cfgs, err := handler.listScopes(params.IDName)
if _, ok := err.(network.ResourceNotFoundError); ok {
return scopes.NewListNotFound().WithPayload(errorPayload(err))
}
return scopes.NewListOK().WithPayload(cfgs)
}
func (handler *ScopesHandlersImpl) ScopesGetContainerEndpoints(params scopes.GetContainerEndpointsParams) middleware.Responder {
defer trace.End(trace.Begin(params.HandleOrID))
cid := params.HandleOrID
// lookup by handle
h := exec.GetHandle(cid)
if h != nil {
cid = h.ExecConfig.ID
}
c := handler.netCtx.Container(cid)
if c == nil {
return scopes.NewGetContainerEndpointsNotFound().WithPayload(errorPayload(fmt.Errorf("container not found")))
}
eps := c.Endpoints()
ecs := make([]*models.EndpointConfig, len(eps))
for i, e := range eps {
ecs[i] = toEndpointConfig(e)
}
return scopes.NewGetContainerEndpointsOK().WithPayload(ecs)
}
func (handler *ScopesHandlersImpl) ScopesAddContainer(params scopes.AddContainerParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Config.Handle)))
h := exec.GetHandle(params.Config.Handle)
if h == nil {
return scopes.NewAddContainerNotFound().WithPayload(&models.Error{Message: "container not found"})
}
err := func() error {
addr := params.Config.NetworkConfig.Address
var ip net.IP
if addr != "" {
ip = net.ParseIP(addr)
if ip == nil {
return fmt.Errorf("invalid ip address %q", addr)
}
}
if len(params.Config.NetworkConfig.Aliases) > 0 {
log.Debugf("Links/Aliases: %#v", params.Config.NetworkConfig.Aliases)
}
options := &network.AddContainerOptions{
Scope: params.Config.NetworkConfig.NetworkName,
IP: ip,
Aliases: params.Config.NetworkConfig.Aliases,
Ports: params.Config.NetworkConfig.Ports,
}
return handler.netCtx.AddContainer(h, options)
}()
if err != nil {
if _, ok := err.(*network.ResourceNotFoundError); ok {
return scopes.NewAddContainerNotFound().WithPayload(errorPayload(err))
}
return scopes.NewAddContainerInternalServerError().WithPayload(errorPayload(err))
}
return scopes.NewAddContainerOK().WithPayload(h.String())
}
func (handler *ScopesHandlersImpl) ScopesRemoveContainer(params scopes.RemoveContainerParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle)))
h := exec.GetHandle(params.Handle)
if h == nil {
return scopes.NewRemoveContainerNotFound().WithPayload(&models.Error{Message: "container not found"})
}
if err := handler.netCtx.RemoveContainer(h, params.Scope); err != nil {
if _, ok := err.(*network.ResourceNotFoundError); ok {
return scopes.NewRemoveContainerNotFound().WithPayload(errorPayload(err))
}
return scopes.NewRemoveContainerInternalServerError().WithPayload(errorPayload(err))
}
return scopes.NewRemoveContainerOK().WithPayload(h.String())
}
func (handler *ScopesHandlersImpl) ScopesBindContainer(params scopes.BindContainerParams) middleware.Responder {
op := trace.NewOperation(context.Background(), params.Handle)
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle), op))
h := exec.GetHandle(params.Handle)
if h == nil {
return scopes.NewBindContainerNotFound().WithPayload(&models.Error{Message: "container not found"})
}
var endpoints []*network.Endpoint
var err error
if endpoints, err = handler.netCtx.BindContainer(op, h); err != nil {
switch err := err.(type) {
case network.ResourceNotFoundError:
return scopes.NewBindContainerNotFound().WithPayload(errorPayload(err))
default:
return scopes.NewBindContainerInternalServerError().WithPayload(errorPayload(err))
}
}
res := &models.BindContainerResponse{
Handle: h.String(),
Endpoints: make([]*models.EndpointConfig, len(endpoints)),
}
for i, e := range endpoints {
res.Endpoints[i] = toEndpointConfig(e)
}
return scopes.NewBindContainerOK().WithPayload(res)
}
func (handler *ScopesHandlersImpl) ScopesUnbindContainer(params scopes.UnbindContainerParams) middleware.Responder {
op := trace.NewOperation(context.Background(), params.Handle)
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle), op))
h := exec.GetHandle(params.Handle)
if h == nil {
return scopes.NewUnbindContainerNotFound()
}
var endpoints []*network.Endpoint
var err error
if endpoints, err = handler.netCtx.UnbindContainer(op, h); err != nil {
switch err := err.(type) {
case network.ResourceNotFoundError:
return scopes.NewUnbindContainerNotFound().WithPayload(errorPayload(err))
default:
return scopes.NewUnbindContainerInternalServerError().WithPayload(errorPayload(err))
}
}
res := &models.UnbindContainerResponse{
Handle: h.String(),
Endpoints: make([]*models.EndpointConfig, len(endpoints)),
}
for i, e := range endpoints {
res.Endpoints[i] = toEndpointConfig(e)
}
return scopes.NewUnbindContainerOK().WithPayload(res)
}
func toScopeConfig(scope *network.Scope) *models.ScopeConfig {
subnet := ""
if !ip.IsUnspecifiedIP(scope.Subnet().IP) {
subnet = scope.Subnet().String()
}
gateway := ""
if !scope.Gateway().IsUnspecified() {
gateway = scope.Gateway().String()
}
sc := &models.ScopeConfig{
ID: scope.ID().String(),
Name: scope.Name(),
ScopeType: scope.Type(),
Subnet: subnet,
Gateway: gateway,
Internal: scope.Internal(),
}
var pools []string
for _, p := range scope.Pools() {
pools = append(pools, p.String())
}
sc.IPAM = pools
if len(sc.IPAM) == 0 {
sc.IPAM = []string{subnet}
}
eps := scope.Endpoints()
sc.Endpoints = make([]*models.EndpointConfig, len(eps))
for i, e := range eps {
sc.Endpoints[i] = toEndpointConfig(e)
}
sc.Annotations = make(map[string]string)
annotations := scope.Annotations()
for k, v := range annotations {
sc.Annotations[k] = v
}
return sc
}
func toEndpointConfig(e *network.Endpoint) *models.EndpointConfig {
addr := ""
if !ip.IsUnspecifiedIP(e.IP()) {
addr = e.IP().String()
}
ports := e.Ports()
ecports := make([]string, len(ports))
for i, p := range e.Ports() {
ecports[i] = p.FullString()
}
return &models.EndpointConfig{
Address: addr,
Container: e.ID().String(),
ID: e.ID().String(),
Name: e.Name(),
Scope: e.Scope().Name(),
Ports: ecports,
}
}

View File

@@ -1,860 +0,0 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/storage"
"github.com/vmware/vic/lib/archive"
epl "github.com/vmware/vic/lib/portlayer/exec"
spl "github.com/vmware/vic/lib/portlayer/storage"
"github.com/vmware/vic/lib/portlayer/storage/container"
"github.com/vmware/vic/lib/portlayer/storage/image"
vsimage "github.com/vmware/vic/lib/portlayer/storage/image/vsphere"
"github.com/vmware/vic/lib/portlayer/storage/volume"
"github.com/vmware/vic/lib/portlayer/storage/volume/nfs"
vsvolume "github.com/vmware/vic/lib/portlayer/storage/volume/vsphere"
"github.com/vmware/vic/lib/portlayer/storage/vsphere"
"github.com/vmware/vic/lib/portlayer/util"
"github.com/vmware/vic/pkg/trace"
"github.com/vmware/vic/pkg/vsphere/datastore"
)
// StorageHandlersImpl is the receiver for all of the storage handler methods
type StorageHandlersImpl struct {
imageCache *image.NameLookupCache
volumeCache *volume.VolumeLookupCache
}
const (
nfsScheme = "nfs"
dsScheme = "ds"
uidQueryKey = "uid"
gidQueryKey = "gid"
)
// Configure assigns functions to all the storage api handlers
func (h *StorageHandlersImpl) Configure(api *operations.PortLayerAPI, handlerCtx *HandlerContext) {
var err error
op := trace.NewOperation(context.Background(), "configure storage layer")
if len(spl.Config.ImageStores) == 0 {
op.Panicf("No image stores provided; unable to instantiate storage layer")
}
imageStoreURL := spl.Config.ImageStores[0]
// TODO: support multiple image stores. Right now we only support the first one
if len(spl.Config.ImageStores) > 1 {
op.Warnf("Multiple image stores found. Multiple image stores are not yet supported. Using [%s] %s", imageStoreURL.Host, imageStoreURL.Path)
}
imageStore, err := vsimage.NewImageStore(op, handlerCtx.Session, &imageStoreURL)
if err != nil {
op.Panicf("Cannot instantiate storage layer: %s", err)
}
// The imagestore is implemented via a cache which is backed via an
// implementation that writes to disks. The cache is used to avoid
// expensive metadata lookups.
h.imageCache = image.NewLookupCache(imageStore)
spl.RegisterImporter(op, imageStoreURL.String(), imageStore)
spl.RegisterExporter(op, imageStoreURL.String(), imageStore)
containerStore, err := container.NewContainerStore(op, handlerCtx.Session, h.imageCache)
if err != nil {
op.Panicf("Couldn't create container store: %s", err.Error())
}
spl.RegisterImporter(op, "container", containerStore)
spl.RegisterExporter(op, "container", containerStore)
// add the volume stores, errors are logged within this function.
h.configureVolumeStores(op, handlerCtx)
api.StorageCreateImageStoreHandler = storage.CreateImageStoreHandlerFunc(h.CreateImageStore)
api.StorageGetImageHandler = storage.GetImageHandlerFunc(h.GetImage)
api.StorageListImagesHandler = storage.ListImagesHandlerFunc(h.ListImages)
api.StorageWriteImageHandler = storage.WriteImageHandlerFunc(h.WriteImage)
api.StorageImageJoinHandler = storage.ImageJoinHandlerFunc(h.ImageJoin)
api.StorageDeleteImageHandler = storage.DeleteImageHandlerFunc(h.DeleteImage)
api.StorageVolumeStoresListHandler = storage.VolumeStoresListHandlerFunc(h.VolumeStoresList)
api.StorageCreateVolumeHandler = storage.CreateVolumeHandlerFunc(h.CreateVolume)
api.StorageRemoveVolumeHandler = storage.RemoveVolumeHandlerFunc(h.RemoveVolume)
api.StorageVolumeJoinHandler = storage.VolumeJoinHandlerFunc(h.VolumeJoin)
api.StorageListVolumesHandler = storage.ListVolumesHandlerFunc(h.VolumesList)
api.StorageGetVolumeHandler = storage.GetVolumeHandlerFunc(h.GetVolume)
api.StorageExportArchiveHandler = storage.ExportArchiveHandlerFunc(h.ExportArchive)
api.StorageImportArchiveHandler = storage.ImportArchiveHandlerFunc(h.ImportArchive)
api.StorageStatPathHandler = storage.StatPathHandlerFunc(h.StatPath)
}
func (h *StorageHandlersImpl) configureVolumeStores(op trace.Operation, handlerCtx *HandlerContext) {
var (
vs volume.VolumeStorer
err error
)
h.volumeCache = volume.NewVolumeLookupCache(op)
// register the pseudo-store to handle the generic "volume" store name
spl.RegisterImporter(op, "volume", h.volumeCache)
spl.RegisterExporter(op, "volume", h.volumeCache)
// Configure the datastores
// Each volume store name maps to a datastore + path, which can be referred to by the name.
for name, dsurl := range spl.Config.VolumeLocations {
switch dsurl.Scheme {
case nfsScheme:
vs, err = createNFSVolumeStore(op, dsurl, name)
case dsScheme:
vs, err = createVsphereVolumeStore(op, dsurl, name, handlerCtx)
default:
err = fmt.Errorf("unknown scheme for %s", dsurl.String())
op.Error(err)
}
// if an error has been logged skip volume store cache addition
if err != nil {
continue
}
op.Infof("Adding volume store %s (%s)", name, dsurl.String())
if _, err = h.volumeCache.AddStore(op, name, vs); err != nil {
op.Errorf("volume addition error %s", err)
}
spl.RegisterImporter(op, dsurl.String(), vs)
spl.RegisterExporter(op, dsurl.String(), vs)
// get the mangled store URLs that the cache uses
// #nosec: Errors unhandled.
cURL, _ := h.volumeCache.GetVolumeStore(op, name)
if cURL != nil {
spl.RegisterImporter(op, cURL.String(), vs)
spl.RegisterExporter(op, cURL.String(), vs)
}
}
}
// CreateImageStore creates a new image store
func (h *StorageHandlersImpl) CreateImageStore(params storage.CreateImageStoreParams) middleware.Responder {
op := trace.NewOperation(context.Background(), fmt.Sprintf("CreateImageStore(%s)", params.Body.Name))
name := params.Body.Name
defer trace.End(trace.Begin(fmt.Sprintf("CreateImageStore: %s", name), op))
registerImageStore := func(h *StorageHandlersImpl, name string) {
// register image store importer/export
spl.RegisterImporter(op, name, h.imageCache.DataStore)
spl.RegisterExporter(op, name, h.imageCache.DataStore)
storeURL, err := util.ImageStoreNameToURL(name)
if err == nil {
spl.RegisterImporter(op, storeURL.String(), h.imageCache.DataStore)
spl.RegisterExporter(op, storeURL.String(), h.imageCache.DataStore)
}
}
url, err := h.imageCache.CreateImageStore(op, name)
if err != nil {
if os.IsExist(err) {
registerImageStore(h, name)
return storage.NewCreateImageStoreConflict().WithPayload(
&models.Error{
Code: http.StatusConflict,
Message: "An image store with that name already exists",
})
}
return storage.NewCreateImageStoreDefault(http.StatusInternalServerError).WithPayload(
&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
registerImageStore(h, name)
s := &models.StoreURL{
Code: http.StatusCreated,
URL: url.String(),
}
return storage.NewCreateImageStoreCreated().WithPayload(s)
}
// GetImage retrieves an image from a store
func (h *StorageHandlersImpl) GetImage(params storage.GetImageParams) middleware.Responder {
id := params.ID
url, err := util.ImageStoreNameToURL(params.StoreName)
if err != nil {
return storage.NewGetImageDefault(http.StatusInternalServerError).WithPayload(
&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
op := trace.NewOperation(context.Background(), fmt.Sprintf("GetImage(%s)", id))
image, err := h.imageCache.GetImage(op, url, id)
if err != nil {
e := &models.Error{
Code: http.StatusNotFound,
Message: err.Error(),
}
return storage.NewGetImageNotFound().WithPayload(e)
}
result := convertImage(image)
return storage.NewGetImageOK().WithPayload(result)
}
// DeleteImage deletes an image from a store
func (h *StorageHandlersImpl) DeleteImage(params storage.DeleteImageParams) middleware.Responder {
ferr := func(err error, code int) middleware.Responder {
log.Errorf("DeleteImage: error %s", err.Error())
return storage.NewDeleteImageDefault(code).WithPayload(
&models.Error{
Code: int64(code),
Message: err.Error(),
})
}
imageURL, err := util.ImageURL(params.StoreName, params.ID)
if err != nil {
return ferr(err, http.StatusInternalServerError)
}
img, err := image.Parse(imageURL)
if err != nil {
return ferr(err, http.StatusInternalServerError)
}
keepNodes := make([]*url.URL, len(params.KeepNodes))
for idx, kn := range params.KeepNodes {
k, err := url.Parse(kn)
if err != nil {
return ferr(err, http.StatusInternalServerError)
}
keepNodes[idx] = k
}
op := trace.NewOperation(context.Background(), fmt.Sprintf("DeleteBranch(%s)", img.ID))
deletedImages, err := h.imageCache.DeleteBranch(op, img, keepNodes)
if err != nil {
switch {
case image.IsErrImageInUse(err):
return ferr(err, http.StatusLocked)
case os.IsNotExist(err):
return ferr(err, http.StatusNotFound)
default:
return ferr(err, http.StatusInternalServerError)
}
}
result := make([]*models.Image, len(deletedImages))
for idx, img := range deletedImages {
result[idx] = convertImage(img)
}
return storage.NewDeleteImageOK().WithPayload(result)
}
// ListImages returns a list of images in a store
func (h *StorageHandlersImpl) ListImages(params storage.ListImagesParams) middleware.Responder {
u, err := util.ImageStoreNameToURL(params.StoreName)
if err != nil {
return storage.NewListImagesDefault(http.StatusInternalServerError).WithPayload(
&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
op := trace.NewOperation(context.Background(), fmt.Sprintf("ListImages(%s, %q)", u.String(), params.Ids))
images, err := h.imageCache.ListImages(op, u, params.Ids)
if err != nil {
return storage.NewListImagesNotFound().WithPayload(
&models.Error{
Code: http.StatusNotFound,
Message: err.Error(),
})
}
result := make([]*models.Image, 0, len(images))
for _, image := range images {
result = append(result, convertImage(image))
}
return storage.NewListImagesOK().WithPayload(result)
}
// WriteImage writes an image to an image store
func (h *StorageHandlersImpl) WriteImage(params storage.WriteImageParams) middleware.Responder {
u, err := util.ImageStoreNameToURL(params.StoreName)
if err != nil {
return storage.NewWriteImageDefault(http.StatusInternalServerError).WithPayload(
&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
parent := &image.Image{
Store: u,
ID: params.ParentID,
}
var meta map[string][]byte
if params.Metadatakey != nil && params.Metadataval != nil {
meta = map[string][]byte{*params.Metadatakey: []byte(*params.Metadataval)}
}
op := trace.NewOperation(context.Background(), fmt.Sprintf("WriteImage(%s)", params.ImageID))
image, err := h.imageCache.WriteImage(op, parent, params.ImageID, meta, params.Sum, params.ImageFile)
if err != nil {
return storage.NewWriteImageDefault(http.StatusInternalServerError).WithPayload(
&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
i := convertImage(image)
return storage.NewWriteImageCreated().WithPayload(i)
}
//ImageJoin modifies the config spec of a container to include the specified image
func (h *StorageHandlersImpl) ImageJoin(params storage.ImageJoinParams) middleware.Responder {
op := trace.NewOperation(context.Background(), "ImageJoin %s", params.ID)
defer trace.End(trace.Begin("", op))
handle := epl.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return storage.NewImageJoinInternalServerError().WithPayload(err)
}
storeURL, _ := util.ImageStoreNameToURL(params.StoreName)
img, err := h.imageCache.GetImage(op, storeURL, params.ID)
if err != nil {
op.Errorf("Volumes: StorageHandler : %#v", err)
return storage.NewImageJoinNotFound().WithPayload(&models.Error{Code: http.StatusNotFound, Message: err.Error()})
}
handleprime, err := image.Join(op, handle, params.Config.DeltaID, params.Config.ImageID, params.Config.RepoName, img)
if err != nil {
op.Errorf("join image failed: %#v", err)
return storage.NewImageJoinInternalServerError().WithPayload(&models.Error{Message: err.Error()})
}
op.Debugf("image %s has been joined to %s as %s", params.ID, handle.Spec.ID(), params.Config.DeltaID)
res := &models.ImageJoinResponse{
Handle: epl.ReferenceFromHandle(handleprime),
}
return storage.NewImageJoinOK().WithPayload(res)
}
// VolumeStoresList lists the configured volume stores and their datastore path URIs.
func (h *StorageHandlersImpl) VolumeStoresList(params storage.VolumeStoresListParams) middleware.Responder {
defer trace.End(trace.Begin("storage_handlers.VolumeStoresList"))
op := trace.NewOperation(context.Background(), "VolumeStoresList")
stores, err := h.volumeCache.VolumeStoresList(op)
if err != nil {
return storage.NewVolumeStoresListInternalServerError().WithPayload(
&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
resp := &models.VolumeStoresListResponse{Stores: stores}
return storage.NewVolumeStoresListOK().WithPayload(resp)
}
//CreateVolume : Create a Volume
func (h *StorageHandlersImpl) CreateVolume(params storage.CreateVolumeParams) middleware.Responder {
defer trace.End(trace.Begin("storage_handlers.CreateVolume"))
//TODO: FIXME: add more errorcodes as we identify error scenarios.
storeURL, err := util.VolumeStoreNameToURL(params.VolumeRequest.Store)
if err != nil {
log.Errorf("storagehandler: VolumeStoreName error: %s", err)
return storage.NewCreateVolumeInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
byteMap := make(map[string][]byte)
for key, value := range params.VolumeRequest.Metadata {
byteMap[key] = []byte(value)
}
capacity := uint64(0)
if params.VolumeRequest.Capacity < 0 {
capacity = uint64(1024) //FIXME: this should look for a default cap and set or fail here.
} else {
capacity = uint64(params.VolumeRequest.Capacity)
}
op := trace.NewOperation(context.Background(), fmt.Sprintf("VolumeCreate(%s)", params.VolumeRequest.Name))
vol, err := h.volumeCache.VolumeCreate(op, params.VolumeRequest.Name, storeURL, capacity*1024, byteMap)
if err != nil {
if os.IsExist(err) {
op.Warnf("Reusing existing volume with target identity")
return storage.NewCreateVolumeConflict().WithPayload(&models.Error{
Code: http.StatusConflict,
Message: err.Error(),
})
}
op.Errorf("storagehandler: VolumeCreate error: %#v", err)
if _, ok := err.(volume.VolumeStoreNotFoundError); ok {
return storage.NewCreateVolumeNotFound().WithPayload(&models.Error{
Code: http.StatusNotFound,
Message: err.Error(),
})
}
return storage.NewCreateVolumeInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
response := volumeToCreateResponse(vol, params.VolumeRequest)
return storage.NewCreateVolumeCreated().WithPayload(&response)
}
//GetVolume : Gets a handle to a volume
func (h *StorageHandlersImpl) GetVolume(params storage.GetVolumeParams) middleware.Responder {
defer trace.End(trace.Begin(params.Name))
op := trace.NewOperation(context.Background(), fmt.Sprintf("VolumeGet(%s)", params.Name))
data, err := h.volumeCache.VolumeGet(op, params.Name)
if err == os.ErrNotExist {
return storage.NewGetVolumeNotFound().WithPayload(&models.Error{
Code: http.StatusNotFound,
Message: err.Error(),
})
}
response, err := fillVolumeModel(data)
if err != nil {
return storage.NewListVolumesInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
op.Debugf("VolumeGet returned : %#v", response)
return storage.NewGetVolumeOK().WithPayload(&response)
}
//RemoveVolume : Remove a Volume from existence
func (h *StorageHandlersImpl) RemoveVolume(params storage.RemoveVolumeParams) middleware.Responder {
defer trace.End(trace.Begin("storage_handlers.RemoveVolume"))
op := trace.NewOperation(context.Background(), fmt.Sprintf("VolumeDestroy(%s)", params.Name))
err := h.volumeCache.VolumeDestroy(op, params.Name)
if err != nil {
switch {
case os.IsNotExist(err):
return storage.NewRemoveVolumeNotFound().WithPayload(&models.Error{
Message: err.Error(),
})
case volume.IsErrVolumeInUse(err):
return storage.NewRemoveVolumeConflict().WithPayload(&models.Error{
Message: err.Error(),
})
default:
return storage.NewRemoveVolumeInternalServerError().WithPayload(&models.Error{
Message: err.Error(),
})
}
}
return storage.NewRemoveVolumeOK()
}
//VolumesList : Lists available volumes for use
func (h *StorageHandlersImpl) VolumesList(params storage.ListVolumesParams) middleware.Responder {
defer trace.End(trace.Begin(""))
var result []*models.VolumeResponse
op := trace.NewOperation(context.Background(), "VolumeList")
portlayerVolumes, err := h.volumeCache.VolumesList(op)
if err != nil {
op.Error(err)
return storage.NewListVolumesInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
op.Debugf("volumes fetched from list call : %#v", portlayerVolumes)
for i := range portlayerVolumes {
model, err := fillVolumeModel(portlayerVolumes[i])
if err != nil {
op.Error(err)
return storage.NewListVolumesInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
result = append(result, &model)
}
op.Debugf("volumes returned from list call : %#v", result)
return storage.NewListVolumesOK().WithPayload(result)
}
//VolumeJoin : modifies the config spec of a container to mount the specified container
func (h *StorageHandlersImpl) VolumeJoin(params storage.VolumeJoinParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), fmt.Sprintf("VolumeJoin(%s)", params.Name))
actualHandle := epl.GetHandle(params.JoinArgs.Handle)
//Note: Name should already be populated by now.
volume, err := h.volumeCache.VolumeGet(op, params.Name)
if err != nil {
op.Errorf("Volumes: StorageHandler : %#v", err)
return storage.NewVolumeJoinInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
// NOTE: unclear to me why we are leaking this logic at this level - the volume should be able to switch Join implementations
// based on its type
switch volume.Device.DiskPath().Scheme {
case nfsScheme:
actualHandle, err = nfs.VolumeJoin(op, actualHandle, volume, params.JoinArgs.MountPath, params.JoinArgs.Flags)
case dsScheme:
actualHandle, err = vsvolume.VolumeJoin(op, actualHandle, volume, params.JoinArgs.MountPath, params.JoinArgs.Flags)
default:
err = fmt.Errorf("unknown scheme (%s) for Volume (%#v)", volume.Device.DiskPath().Scheme, *volume)
}
if err != nil {
op.Errorf("Volumes: StorageHandler : %#v", err)
return storage.NewVolumeJoinInternalServerError().WithPayload(&models.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
})
}
op.Infof("volume %s has been joined to a container", volume.ID)
return storage.NewVolumeJoinOK().WithPayload(actualHandle.String())
}
// ImportArchive takes an input tar archive and unpacks to destination
func (h *StorageHandlersImpl) ImportArchive(params storage.ImportArchiveParams) middleware.Responder {
defer trace.End(trace.Begin(""))
id := params.DeviceID
op := trace.NewOperation(context.Background(), "ImportArchive: %s", id)
filterSpec, err := archive.DecodeFilterSpec(op, params.FilterSpec)
if err != nil {
return storage.NewImportArchiveUnprocessableEntity()
}
store, ok := spl.GetImporter(params.Store)
if !ok {
op.Errorf("Failed to locate import capable store %s", params.Store)
op.Debugf("Available importers are: %+q", spl.GetImporters())
return storage.NewImportArchiveNotFound()
}
err = store.Import(op, id, filterSpec, params.Archive)
if err != nil {
op.Errorf("import failed: %s", err)
// error checking for no such file/directory
if os.IsNotExist(err) {
return storage.NewImportArchiveNotFound()
}
// error checking for internal server error from toolbox
if vsphere.IsToolBoxStateChangeErr(err) {
return storage.NewImportArchiveConflict()
}
return storage.NewExportArchiveInternalServerError()
}
return storage.NewImportArchiveOK()
}
// ExportArchive creates a tar archive and returns to caller
func (h *StorageHandlersImpl) ExportArchive(params storage.ExportArchiveParams) middleware.Responder {
defer trace.End(trace.Begin(""))
id := params.DeviceID
ancestor := ""
if params.Ancestor != nil {
ancestor = *params.Ancestor
}
op := trace.NewOperation(context.Background(), "ExportArchive: %s:%s", id, ancestor)
filterSpec, err := archive.DecodeFilterSpec(op, params.FilterSpec)
if err != nil {
return storage.NewExportArchiveUnprocessableEntity()
}
store, ok := spl.GetExporter(params.Store)
if !ok {
op.Errorf("Failed to locate export capable store %s", params.Store)
op.Debugf("Available exporters are: %+q", spl.GetExporters())
return storage.NewExportArchiveNotFound()
}
r, err := store.Export(op, id, ancestor, filterSpec, params.Data)
if err != nil {
// hickeng: we're in need of typed errors - should check for id not found for 404 return
op.Errorf("export failed: %s", err)
if r != nil {
r.Close()
}
return storage.NewExportArchiveInternalServerError()
}
return NewStreamOutputHandler("ExportArchive").WithPayload(NewFlushingReader(r), params.DeviceID, func() { r.Close() })
}
// StatPath returns file info on the target path of a container copy
func (h *StorageHandlersImpl) StatPath(params storage.StatPathParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "StatPath: %s", params.DeviceID)
filterSpec, err := archive.DecodeFilterSpec(op, params.FilterSpec)
if err != nil {
return storage.NewStatPathUnprocessableEntity()
}
if len(filterSpec.Inclusions) != 1 {
return storage.NewStatPathUnprocessableEntity()
}
store, ok := spl.GetExporter(params.Store)
if !ok {
op.Errorf("Error getting exporter: %s", err.Error())
return storage.NewStatPathNotFound()
}
dataSource, err := store.NewDataSource(op, params.DeviceID)
if err != nil {
op.Errorf("Error getting data source: %s", err.Error())
return storage.NewStatPathInternalServerError()
}
defer dataSource.Close()
fileStat, err := dataSource.Stat(op, filterSpec)
if err != nil {
if os.IsNotExist(err) {
// would like to be able to differentiate between store and files, but....
op.Debugf("Stat target did not exist: %s", err)
return storage.NewStatPathNotFound()
}
op.Errorf("Error getting datasource stats: %s", err)
return storage.NewStatPathInternalServerError()
}
modTimeBytes, err := fileStat.ModTime.GobEncode()
if err != nil {
return storage.NewStatPathUnprocessableEntity()
}
op.Debugf("found data successfully")
return storage.
NewStatPathOK().
WithMode(fileStat.Mode).
WithLinkTarget(fileStat.LinkTarget).
WithName(fileStat.Name).
WithSize(fileStat.Size).
WithModTime(string(modTimeBytes))
}
//utility functions
// convert an SPL Image to a swagger-defined Image
func convertImage(image *image.Image) *models.Image {
var parent, selfLink string
// scratch image
if image.ParentLink != nil {
parent = image.ParentLink.String()
}
if image.SelfLink != nil {
selfLink = image.SelfLink.String()
}
meta := make(map[string]string)
if image.Metadata != nil {
for k, v := range image.Metadata {
meta[k] = string(v)
}
}
return &models.Image{
ID: image.ID,
SelfLink: selfLink,
Parent: parent,
Metadata: meta,
Store: image.Store.String(),
}
}
func volumeToCreateResponse(volume *volume.Volume, model *models.VolumeRequest) models.VolumeResponse {
response := models.VolumeResponse{
Driver: model.Driver,
Name: volume.ID,
Label: volume.Label,
Store: model.Store,
Metadata: model.Metadata,
}
return response
}
func fillVolumeModel(volume *volume.Volume) (models.VolumeResponse, error) {
storeName, err := util.VolumeStoreName(volume.Store)
if err != nil {
return models.VolumeResponse{}, err
}
metadata := createMetadataMap(volume)
model := models.VolumeResponse{
Name: volume.ID,
Driver: "vsphere",
Store: storeName,
Metadata: metadata,
Label: volume.Label,
}
return model, nil
}
func createMetadataMap(volume *volume.Volume) map[string]string {
stringMap := make(map[string]string)
for k, v := range volume.Info {
stringMap[k] = string(v)
}
return stringMap
}
func createNFSVolumeStore(op trace.Operation, dsurl *url.URL, name string) (volume.VolumeStorer, error) {
var err error
uid, gid, err := parseUIDAndGID(dsurl)
if err != nil {
op.Errorf("%s", err.Error())
return nil, err
}
// XXX replace with the vch name
mnt := nfs.NewMount(dsurl, "vic", uint32(uid), uint32(gid))
vs, err := nfs.NewVolumeStore(op, name, mnt)
if err != nil {
op.Errorf("%s", err.Error())
return nil, err
}
return vs, nil
}
func parseUIDAndGID(queryURL *url.URL) (int, int, error) {
var err error
uid := nfs.DefaultUID
gid := nfs.DefaultUID
vsUID := queryURL.Query().Get(uidQueryKey)
vsGID := queryURL.Query().Get(gidQueryKey)
if vsGID == "" {
vsGID = vsUID
}
if vsUID != "" {
uid, err = strconv.Atoi(vsUID)
if err != nil {
return -1, -1, err
}
}
if vsGID != "" {
gid, err = strconv.Atoi(vsGID)
if err != nil {
return -1, -1, err
}
}
if uid < 0 {
return -1, -1, fmt.Errorf("supplied url (%s) for nfs volume store has invalid uid : (%d)", queryURL.String(), uid)
}
if gid < 0 {
return -1, -1, fmt.Errorf("supplied url (%s) for nfs volume store has invalid gid : (%d)", queryURL.String(), gid)
}
return uid, gid, nil
}
func createVsphereVolumeStore(op trace.Operation, dsurl *url.URL, name string, handlerCtx *HandlerContext) (volume.VolumeStorer, error) {
ds, err := datastore.NewHelperFromURL(op, handlerCtx.Session, dsurl)
if err != nil {
err = fmt.Errorf("cannot find datastores: %s", err)
op.Errorf("%s", err.Error())
return nil, err
}
vs, err := vsvolume.NewVolumeStore(op, name, handlerCtx.Session, ds)
if err != nil {
err = fmt.Errorf("cannot instantiate the volume store: %s", err)
op.Errorf("%s", err.Error())
return nil, err
}
return vs, nil
}

View File

@@ -1,631 +0,0 @@
// Copyright 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/storage"
"github.com/vmware/vic/lib/archive"
"github.com/vmware/vic/lib/constants"
spl "github.com/vmware/vic/lib/portlayer/storage"
"github.com/vmware/vic/lib/portlayer/storage/image"
"github.com/vmware/vic/lib/portlayer/storage/volume"
"github.com/vmware/vic/lib/portlayer/util"
"github.com/vmware/vic/pkg/trace"
"github.com/vmware/vic/pkg/vsphere/vm"
)
var (
testImageID = "testImage"
testImageSum = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
testHostName, _ = os.Hostname()
testStoreName = "testStore"
testStoreURL = url.URL{
Scheme: "http",
Host: testHostName,
Path: "/" + util.ImageURLPath + "/" + testStoreName,
}
)
type MockDataStore struct {
}
type MockVolumeStore struct {
// id -> volume
db map[string]*volume.Volume
}
func NewMockVolumeStore() *MockVolumeStore {
m := &MockVolumeStore{
db: make(map[string]*volume.Volume),
}
return m
}
// Creates a volume on the given volume store, of the given size, with the given metadata.
func (m *MockVolumeStore) VolumeCreate(op trace.Operation, ID string, store *url.URL, capacityKB uint64, info map[string][]byte) (*volume.Volume, error) {
storeName, err := util.VolumeStoreName(store)
if err != nil {
return nil, err
}
selfLink, err := util.VolumeURL(storeName, ID)
if err != nil {
return nil, err
}
vol := &volume.Volume{
ID: ID,
Store: store,
SelfLink: selfLink,
}
m.db[ID] = vol
return vol, nil
}
// Get an existing volume via it's ID and volume store.
func (m *MockVolumeStore) VolumeGet(op trace.Operation, ID string) (*volume.Volume, error) {
vol, ok := m.db[ID]
if !ok {
return nil, os.ErrNotExist
}
return vol, nil
}
// Destroys a volume
func (m *MockVolumeStore) VolumeDestroy(op trace.Operation, vol *volume.Volume) error {
if _, ok := m.db[vol.ID]; !ok {
return os.ErrNotExist
}
delete(m.db, vol.ID)
return nil
}
func (m *MockVolumeStore) VolumeStoresList(op trace.Operation) (map[string]url.URL, error) {
return nil, fmt.Errorf("not implemented")
}
// Lists all volumes on the given volume store`
func (m *MockVolumeStore) VolumesList(op trace.Operation) ([]*volume.Volume, error) {
var i int
list := make([]*volume.Volume, len(m.db))
for _, v := range m.db {
t := *v
list[i] = &t
i++
}
return list, nil
}
func (m *MockVolumeStore) Export(op trace.Operation, child, ancestor string, spec *archive.FilterSpec, data bool) (io.ReadCloser, error) {
return nil, nil
}
func (m *MockVolumeStore) Import(op trace.Operation, id string, spec *archive.FilterSpec, tarstream io.ReadCloser) error {
return nil
}
func (m *MockVolumeStore) NewDataSink(op trace.Operation, id string) (spl.DataSink, error) {
return nil, nil
}
func (m *MockVolumeStore) NewDataSource(op trace.Operation, id string) (spl.DataSource, error) {
return nil, nil
}
func (m *MockVolumeStore) URL(op trace.Operation, id string) (*url.URL, error) {
return nil, nil
}
func (m *MockVolumeStore) Owners(op trace.Operation, url *url.URL, filter func(vm *mo.VirtualMachine) bool) ([]*vm.VirtualMachine, error) {
return nil, nil
}
// GetImageStore checks to see if a named image store exists and returls the
// URL to it if so or error.
func (c *MockDataStore) GetImageStore(op trace.Operation, storeName string) (*url.URL, error) {
_, err := util.ImageStoreNameToURL(storeName)
if err != nil {
return nil, err
}
return nil, os.ErrNotExist
}
func (c *MockDataStore) CreateImageStore(op trace.Operation, storeName string) (*url.URL, error) {
u, err := util.ImageStoreNameToURL(storeName)
if err != nil {
return nil, err
}
return u, nil
}
func (c *MockDataStore) DeleteImageStore(op trace.Operation, storeName string) error {
return nil
}
func (c *MockDataStore) ListImageStores(op trace.Operation) ([]*url.URL, error) {
return nil, nil
}
func (c *MockDataStore) Export(op trace.Operation, child, ancestor string, spec *archive.FilterSpec, data bool) (io.ReadCloser, error) {
return nil, nil
}
func (c *MockDataStore) Import(op trace.Operation, id string, spec *archive.FilterSpec, tarstream io.ReadCloser) error {
return nil
}
func (c *MockDataStore) NewDataSink(op trace.Operation, id string) (spl.DataSink, error) {
return nil, nil
}
func (c *MockDataStore) NewDataSource(op trace.Operation, id string) (spl.DataSource, error) {
return nil, nil
}
func (c *MockDataStore) URL(op trace.Operation, id string) (*url.URL, error) {
return nil, nil
}
func (c *MockDataStore) Owners(op trace.Operation, url *url.URL, filter func(vm *mo.VirtualMachine) bool) ([]*vm.VirtualMachine, error) {
return nil, nil
}
func (c *MockDataStore) WriteImage(op trace.Operation, parent *image.Image, ID string, meta map[string][]byte, sum string, r io.Reader) (*image.Image, error) {
storeName, err := util.ImageStoreName(parent.Store)
if err != nil {
return nil, err
}
selflink, err := util.ImageURL(storeName, ID)
if err != nil {
return nil, err
}
i := image.Image{
ID: ID,
Store: parent.Store,
ParentLink: parent.SelfLink,
SelfLink: selflink,
Metadata: meta,
}
return &i, nil
}
func (c *MockDataStore) WriteMetadata(op trace.Operation, storeName string, ID string, meta map[string][]byte) error {
return nil
}
// GetImage gets the specified image from the given store by retreiving it from the cache.
func (c *MockDataStore) GetImage(op trace.Operation, store *url.URL, ID string) (*image.Image, error) {
if ID == constants.ScratchLayerID {
return &image.Image{Store: store}, nil
}
return nil, os.ErrNotExist
}
// ListImages resturns a list of Images for a list of IDs, or all if no IDs are passed
func (c *MockDataStore) ListImages(op trace.Operation, store *url.URL, IDs []string) ([]*image.Image, error) {
return nil, fmt.Errorf("store (%s) doesn't exist", store.String())
}
func (c *MockDataStore) DeleteImage(op trace.Operation, image *image.Image) (*image.Image, error) {
return nil, nil
}
func TestCreateImageStore(t *testing.T) {
s := &StorageHandlersImpl{
imageCache: image.NewLookupCache(&MockDataStore{}),
}
store := &models.ImageStore{
Name: "testStore",
}
params := storage.CreateImageStoreParams{
Body: store,
}
result := s.CreateImageStore(params)
if !assert.NotNil(t, result) {
return
}
// try to recreate the same image store
result = s.CreateImageStore(params)
if !assert.NotNil(t, result) {
return
}
// expect 409 since it already exists
conflict := &storage.CreateImageStoreConflict{
Payload: &models.Error{
Code: http.StatusConflict,
Message: "An image store with that name already exists",
},
}
if !assert.Equal(t, conflict, result) {
return
}
}
func TestGetImage(t *testing.T) {
s := &StorageHandlersImpl{
imageCache: image.NewLookupCache(&MockDataStore{}),
}
params := &storage.GetImageParams{
ID: testImageID,
StoreName: testStoreName,
}
result := s.GetImage(*params)
if !assert.NotNil(t, result) {
return
}
op := trace.NewOperation(context.Background(), "test")
// create the image store
url, err := s.imageCache.CreateImageStore(op, testStoreName)
// TODO(jzt): these are testing NameLookupCache, do we need them here?
if !assert.Nil(t, err, "Error while creating image store") {
return
}
if !assert.Equal(t, testStoreURL.String(), url.String()) {
return
}
// try GetImage again
result = s.GetImage(*params)
if !assert.NotNil(t, result) {
return
}
// add image to store
parent := image.Image{
ID: "scratch",
SelfLink: nil,
ParentLink: nil,
Store: &testStoreURL,
}
expectedMeta := make(map[string][]byte)
expectedMeta["foo"] = []byte("bar")
// add the image to the store
image, err := s.imageCache.WriteImage(op, &parent, testImageID, expectedMeta, testImageSum, nil)
if !assert.NoError(t, err) || !assert.NotNil(t, image) {
return
}
selflink, err := util.ImageURL(testStoreName, testImageID)
if !assert.NoError(t, err) {
return
}
sl := selflink.String()
parentlink, err := util.ImageURL(testStoreName, parent.ID)
if !assert.NoError(t, err) {
return
}
p := parentlink.String()
eMeta := make(map[string]string)
eMeta["foo"] = "bar"
// expect our image back now that we've created it
expected := &storage.GetImageOK{
Payload: &models.Image{
ID: image.ID,
SelfLink: sl,
Parent: p,
Store: testStoreURL.String(),
Metadata: eMeta,
},
}
result = s.GetImage(*params)
if !assert.NotNil(t, result) {
return
}
if !assert.Equal(t, expected, result) {
return
}
}
func TestListImages(t *testing.T) {
s := &StorageHandlersImpl{
imageCache: image.NewLookupCache(&MockDataStore{}),
}
params := &storage.ListImagesParams{
StoreName: testStoreName,
}
outImages := s.ListImages(*params)
if !assert.NotNil(t, outImages) {
return
}
op := trace.NewOperation(context.Background(), "test")
// create the image store
url, err := s.imageCache.CreateImageStore(op, testStoreName)
if !assert.NoError(t, err) {
return
}
if !assert.NotNil(t, url) {
return
}
// create a set of images
images := make(map[string]*image.Image)
parent := image.Image{
ID: constants.ScratchLayerID,
}
parent.Store = &testStoreURL
for i := 1; i < 50; i++ {
id := fmt.Sprintf("id-%d", i)
img, err := s.imageCache.WriteImage(op, &parent, id, nil, testImageSum, nil)
if !assert.NoError(t, err) {
return
}
if !assert.NotNil(t, img) {
return
}
images[id] = img
}
// List all images
outImages = s.ListImages(*params)
assert.IsType(t, &storage.ListImagesOK{}, outImages)
assert.Equal(t, len(outImages.(*storage.ListImagesOK).Payload), len(images))
for _, img := range outImages.(*storage.ListImagesOK).Payload {
_, ok := images[img.ID]
if !assert.True(t, ok) {
return
}
}
// List specific images
var ids []string
// query for odd-numbered image ids
for i := 1; i < 50; i += 2 {
ids = append(ids, fmt.Sprintf("id-%d", i))
}
params.Ids = ids
outImages = s.ListImages(*params)
assert.IsType(t, &storage.ListImagesOK{}, outImages)
assert.Equal(t, len(ids), len(outImages.(*storage.ListImagesOK).Payload))
outmap := make(map[string]*models.Image)
for _, image := range outImages.(*storage.ListImagesOK).Payload {
outmap[image.ID] = image
}
// ensure no even-numbered image ids in our result
for i := 2; i < 50; i += 2 {
id := fmt.Sprintf("id-%d", i)
_, ok := outmap[id]
if !assert.False(t, ok) {
return
}
}
}
func TestWriteImage(t *testing.T) {
ic := image.NewLookupCache(&MockDataStore{})
// create image store
op := trace.NewOperation(context.Background(), "test")
_, err := ic.CreateImageStore(op, testStoreName)
if err != nil {
return
}
s := &StorageHandlersImpl{
imageCache: ic,
}
eMeta := make(map[string]string)
eMeta["foo"] = "bar"
name := new(string)
val := new(string)
*name = "foo"
*val = eMeta["foo"]
params := &storage.WriteImageParams{
StoreName: testStoreName,
ImageID: testImageID,
ParentID: "scratch",
Sum: testImageSum,
Metadatakey: name,
Metadataval: val,
ImageFile: nil,
}
parentlink, err := util.ImageURL(testStoreName, params.ParentID)
if !assert.NoError(t, err) {
return
}
p := parentlink.String()
selflink, err := util.ImageURL(testStoreName, testImageID)
if !assert.NoError(t, err) {
return
}
sl := selflink.String()
expected := &storage.WriteImageCreated{
Payload: &models.Image{
ID: testImageID,
Parent: p,
SelfLink: sl,
Store: testStoreURL.String(),
Metadata: eMeta,
},
}
result := s.WriteImage(*params)
if !assert.NotNil(t, result) {
return
}
if !assert.Equal(t, expected, result) {
return
}
}
func TestVolumeCreate(t *testing.T) {
op := trace.NewOperation(context.Background(), "test")
volCache := volume.NewVolumeLookupCache(op)
testStore := NewMockVolumeStore()
_, err := volCache.AddStore(op, "testStore", testStore)
if !assert.NoError(t, err) {
return
}
handler := StorageHandlersImpl{
volumeCache: volCache,
}
model := models.VolumeRequest{}
model.Store = "testStore"
model.Name = "testVolume"
model.Capacity = 1
model.Driver = "vsphere"
model.DriverArgs = make(map[string]string)
model.DriverArgs["stuff"] = "things"
model.Metadata = make(map[string]string)
params := storage.NewCreateVolumeParams()
params.VolumeRequest = &model
handler.CreateVolume(params)
testVolume, err := testStore.VolumeGet(op, model.Name)
if !assert.NoError(t, err) {
return
}
if !assert.NotNil(t, testVolume) {
return
}
testVolumeStoreName, err := util.VolumeStoreName(testVolume.Store)
if !assert.NoError(t, err) {
return
}
if !assert.Equal(t, "testStore", testVolumeStoreName) {
return
}
if !assert.Equal(t, "testVolume", testVolume.ID) {
return
}
}
func TestParseUIDAndGID(t *testing.T) {
positiveCases := []url.URL{
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=1234&gid=5678",
Path: "/test/path",
},
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=00000000000000&gid=00000000000000000000000000",
Path: "/test/path",
},
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=321321321&gid=123123123",
Path: "/test/path",
},
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=0&gid=0",
Path: "/test/path",
},
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=&gid=",
Path: "/test/path",
},
}
negativeCases := []url.URL{
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=Hello&gid=World",
Path: "/test/path",
},
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=ASKJHG#!@#LJK$&gid=!@#$$%#@@!",
Path: "/test/path",
},
{
Scheme: "nfs",
Host: "testURL",
RawQuery: "uid=9999999999999999999999999999999999999999999999999&gid=7777777777777777777777777777777777777777777777777777777",
Path: "/test/path",
},
}
for _, v := range positiveCases {
testUID, testGID, err := parseUIDAndGID(&v)
assert.Nil(t, err, v.String())
assert.NotEqual(t, -1, testUID, v.String())
assert.NotEqual(t, -1, testGID, v.String())
}
for _, v := range negativeCases {
testUID, testGID, err := parseUIDAndGID(&v)
assert.NotNil(t, err, v.String())
assert.Equal(t, -1, testUID, v.String())
assert.Equal(t, -1, testGID, v.String())
}
}

View File

@@ -1,231 +0,0 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"bytes"
"fmt"
"io"
"net/http"
log "github.com/Sirupsen/logrus"
"github.com/go-openapi/runtime"
"github.com/vmware/vic/pkg/trace"
)
// StreamOutputHandler is a custom return handler that provides common
// stream handling across the API.
type StreamOutputHandler struct {
outputStream *FlushingReader
id string
outputName string
onHTTPClose func() // clean up func called when transport closed
}
// NewStreamOutputHandler creates StreamOutputHandler with default headers values
func NewStreamOutputHandler(name string) *StreamOutputHandler {
return &StreamOutputHandler{outputName: name}
}
// WithPayload adds the payload to the container set stdin internal server error response
func (s *StreamOutputHandler) WithPayload(payload *FlushingReader, id string, cleanup func()) *StreamOutputHandler {
s.outputStream = payload
s.id = id
s.onHTTPClose = cleanup
return s
}
// WriteResponse to the client
func (s *StreamOutputHandler) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) {
defer trace.End(trace.Begin(fmt.Sprintf("Stream of %s:%s", s.outputName, s.id)))
rw.WriteHeader(http.StatusOK)
if f, ok := rw.(http.Flusher); ok {
f.Flush()
s.outputStream.AddFlusher(f)
}
// prevent us from being dependent on client closing the connection once we've copied
// all available data.
done := make(chan bool)
defer close(done)
if s.onHTTPClose != nil {
notify := rw.(http.CloseNotifier).CloseNotify()
go func() {
// continue on either
select {
case <-notify:
case <-done:
}
// execute cleanup function
s.onHTTPClose()
log.Debugf("Completed stream cleanup for %s:%s", s.outputName, s.id)
}()
}
_, err := io.Copy(rw, s.outputStream)
if err != nil {
log.Errorf("Error streaming %s for %s, total unwrapped bytes: %d, %s", s.outputName, s.id, s.outputStream.totalBytes, err)
} else {
log.Debugf("Finished streaming %s for %s (unwrapped bytes: %d)", s.outputName, s.id, s.outputStream.totalBytes)
}
}
// closePipe is a convenience function for closing the event stream pipe
func closePipe(pipeReader *io.PipeReader, pipeWriter *io.PipeWriter) {
if pipeReader != nil {
pipeReader.Close()
}
if pipeWriter != nil {
pipeWriter.Close()
}
}
// GenericFlusher is a custom reader to allow us to detach cleanly during an io.Copy
type GenericFlusher interface {
Flush()
}
type FlushingReader struct {
io.Reader
io.WriterTo
flusher GenericFlusher
initBytes []byte
totalBytes uint64
}
func NewFlushingReader(rdr io.Reader) *FlushingReader {
return &FlushingReader{Reader: rdr, flusher: nil, initBytes: nil}
}
func NewFlushingReaderWithInitBytes(rdr io.Reader, initBytes []byte) *FlushingReader {
return &FlushingReader{Reader: rdr, flusher: nil, initBytes: initBytes}
}
func (d *FlushingReader) AddFlusher(flusher GenericFlusher) {
d.flusher = flusher
}
// readDetectInit() is used by WriteTo() which is used by io.Copy. It attempts
// to detect a init byte buffer. If it finds that init byte sequence, it is
// ignored. This reader does not care about the init sequeunce. The init sequence
// maybe used by the higher level interaction, which in this case is the Swagger
// establishing initial connection for stdin.
//
// Panics if the buf is smaller than the initBytes
func (d *FlushingReader) readDetectInit(buf []byte) (int, error) {
initLen := len(d.initBytes)
// fast path - len(nil) return 0
if initLen == 0 {
return d.Read(buf)
}
// make sure we have enough room
if len(buf) < initLen {
panic("Read buffer is smaller than the initialization byte sequence")
}
total := 0
upto := 0
for total < initLen {
nr, err := d.Read(buf[total:])
if nr > 0 {
total += nr
// we are only interested with the first initLen bytes
upto = total
if upto > initLen {
upto = initLen
}
if bytes.Compare(d.initBytes[0:upto], buf[0:upto]) != 0 {
// First bytes aren't part of init bytes so client must not be
// the docker personality so break and ignore looking for the
// init bytes.
log.Debugf("Did not find primer bytes, stopping watch")
return total, err
}
}
if err != nil && total < initLen {
log.Debugf("Primer bytes read %d bytes, err %s, stopping watch", nr, err)
return 0, err
}
}
// would have returned in the compare clause if not matching init bytes
copy(buf[0:], buf[initLen:])
log.Debugf("Found primer bytes, port layer client might be personality server")
// no risk of returning <0
return total - initLen, nil
}
// WriteTo is derived from go's io.Copy. We use a smaller buffer so as to not hold up
// writing out data. Go's version allocates 32k, and the Read will wait till
// buffer is filled (unless EOF is encountered). Also, we force a flush if
// a flusher is added. We've seen cases where the last bit of data for a
// screen doesn't reach the docker engine api server. The flush solves that
// issue.
func (d *FlushingReader) WriteTo(w io.Writer) (written int64, err error) {
buf := make([]byte, ioCopyBufferSize)
defer func() {
total := d.totalBytes + uint64(written)
if total >= d.totalBytes {
d.totalBytes = total
return
}
log.Debug("Restarting total byte record for %p from zero, current total: %d", d, d.totalBytes)
d.totalBytes = uint64(written)
}()
nr, er := d.readDetectInit(buf)
for {
if nr > 0 {
nw, ew := w.Write(buf[0:nr])
if d.flusher != nil {
d.flusher.Flush()
}
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
// it's safe to ignore ErrClosedPipe -- encountered when
// you close the pipe that is feeding the flushingReader
if er == io.EOF || er == io.ErrClosedPipe {
break
}
if er != nil {
err = er
break
}
nr, er = d.Read(buf)
}
return written, err
}

View File

@@ -1,272 +0,0 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"context"
"strings"
"github.com/go-openapi/runtime/middleware"
"github.com/vmware/vic/lib/apiservers/portlayer/models"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations"
"github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations/tasks"
"github.com/vmware/vic/lib/config/executor"
"github.com/vmware/vic/lib/portlayer/exec"
"github.com/vmware/vic/lib/portlayer/task"
"github.com/vmware/vic/pkg/trace"
"github.com/vmware/vic/pkg/uid"
)
// TaskHandlersImpl is the receiver for all of the task handler methods
type TaskHandlersImpl struct {
}
func (handler *TaskHandlersImpl) Configure(api *operations.PortLayerAPI, _ *HandlerContext) {
api.TasksJoinHandler = tasks.JoinHandlerFunc(handler.JoinHandler)
api.TasksBindHandler = tasks.BindHandlerFunc(handler.BindHandler)
api.TasksUnbindHandler = tasks.UnbindHandlerFunc(handler.UnbindHandler)
api.TasksRemoveHandler = tasks.RemoveHandlerFunc(handler.RemoveHandler)
api.TasksInspectHandler = tasks.InspectHandlerFunc(handler.InspectHandler)
api.TasksWaitHandler = tasks.WaitHandlerFunc(handler.WaitHandler)
}
// JoinHandler calls the Join
func (handler *TaskHandlersImpl) JoinHandler(params tasks.JoinParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "task.Join(%s, %s)", params.Config.Handle, params.Config.ID)
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return tasks.NewJoinInternalServerError().WithPayload(err)
}
// TODO: ensure uniqueness of ID - this is already an issue with containercreate now we're not using it as
// the VM name and cannot rely on vSphere for uniqueness guarantee
id := params.Config.ID
if id == "" {
id = uid.New().String()
}
op.Debugf("ID: %#v", id)
op.Debugf("Path: %#v", params.Config.Path)
op.Debugf("WorkingDir: %#v", params.Config.WorkingDir)
op.Debugf("OpenStdin: %#v", params.Config.OpenStdin)
op.Debugf("Attach: %#v", params.Config.Attach)
op.Debugf("User: %s", params.Config.User)
sessionConfig := &executor.SessionConfig{
Common: executor.Common{
ExecutionEnvironment: params.Config.Namespace,
ID: id,
},
Tty: params.Config.Tty,
Attach: params.Config.Attach,
OpenStdin: params.Config.OpenStdin,
Cmd: executor.Cmd{
Env: params.Config.Env,
Dir: params.Config.WorkingDir,
Path: params.Config.Path,
Args: append([]string{params.Config.Path}, params.Config.Args...),
},
StopSignal: params.Config.StopSignal,
}
// parsing user
if params.Config.User != "" {
parts := strings.Split(params.Config.User, ":")
if len(parts) > 0 {
sessionConfig.User = parts[0]
}
if len(parts) > 1 {
sessionConfig.Group = parts[1]
}
}
handleprime, err := task.Join(&op, handle, sessionConfig)
if err != nil {
op.Errorf("%s", err.Error())
return tasks.NewJoinInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.TaskJoinResponse{
ID: id,
Handle: exec.ReferenceFromHandle(handleprime),
}
return tasks.NewJoinOK().WithPayload(res)
}
// BindHandler calls the Bind
func (handler *TaskHandlersImpl) BindHandler(params tasks.BindParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "task.Bind(%s, %s)", params.Config.Handle, params.Config.ID)
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return tasks.NewBindInternalServerError().WithPayload(err)
}
handleprime, err := task.Bind(&op, handle, params.Config.ID)
if err != nil {
op.Errorf("%s", err.Error())
return tasks.NewBindInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.TaskBindResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return tasks.NewBindOK().WithPayload(res)
}
// UnbindHandler calls the Unbind
func (handler *TaskHandlersImpl) UnbindHandler(params tasks.UnbindParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "task.Unbind(%s, %s)", params.Config.Handle, params.Config.ID)
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return tasks.NewUnbindInternalServerError().WithPayload(err)
}
handleprime, err := task.Unbind(&op, handle, params.Config.ID)
if err != nil {
op.Errorf("%s", err.Error())
return tasks.NewUnbindInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.TaskUnbindResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return tasks.NewUnbindOK().WithPayload(res)
}
// RemoveHandler calls remove
func (handler *TaskHandlersImpl) RemoveHandler(params tasks.RemoveParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "task.Remove(%s, %s)", params.Config.Handle, params.Config.ID)
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return tasks.NewRemoveInternalServerError().WithPayload(err)
}
handleprime, err := task.Remove(&op, handle, params.Config.ID)
if err != nil {
op.Errorf("%s", err.Error())
return tasks.NewRemoveInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
res := &models.TaskRemoveResponse{
Handle: exec.ReferenceFromHandle(handleprime),
}
return tasks.NewRemoveOK().WithPayload(res)
}
// InspectHandler calls inspect
func (handler *TaskHandlersImpl) InspectHandler(params tasks.InspectParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "task.Inspect(%s, %s)", params.Config.Handle, params.Config.ID)
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return tasks.NewInspectInternalServerError().WithPayload(err)
}
t, err := task.Inspect(&op, handle, params.Config.ID)
if err != nil {
op.Errorf("%s", err.Error())
if _, ok := err.(task.TaskPowerStateError); ok {
return tasks.NewInspectConflict().WithPayload(
&models.Error{Message: err.Error()},
)
}
return tasks.NewInspectInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
op.Debugf("ID: %#v", t.ID)
op.Debugf("Path: %#v", t.Cmd.Path)
op.Debugf("Args: %#v", t.Cmd.Args)
op.Debugf("Running: %#v", t.StartTime)
op.Debugf("ExitCode: %#v", t.ExitStatus)
res := &models.TaskInspectResponse{
ID: t.ID,
Running: t.Started == "true",
ExitCode: int64(t.ExitStatus),
ProcessConfig: &models.ProcessConfig{
ExecPath: t.Cmd.Path,
ExecArgs: t.Cmd.Args,
},
Tty: t.Tty,
User: t.User,
OpenStdin: t.OpenStdin,
OpenStdout: t.Attach,
OpenStderr: t.Attach,
Pid: 0,
}
// report launch error if we failed
if t.Started != "" && t.Started != "true" {
res.ProcessConfig.ErrorMsg = t.Started
}
return tasks.NewInspectOK().WithPayload(res)
}
// WaitHandler calls wait
func (handler *TaskHandlersImpl) WaitHandler(params tasks.WaitParams) middleware.Responder {
defer trace.End(trace.Begin(""))
op := trace.NewOperation(context.Background(), "task.Wait(%s, %s)", params.Config.Handle, params.Config.ID)
handle := exec.HandleFromInterface(params.Config.Handle)
if handle == nil {
err := &models.Error{Message: "Failed to get the Handle"}
return tasks.NewInspectInternalServerError().WithPayload(err)
}
// wait task to set started field to something
err := task.Wait(&op, handle, params.Config.ID)
if err != nil {
op.Errorf("%s", err.Error())
return tasks.NewWaitInternalServerError().WithPayload(
&models.Error{Message: err.Error()},
)
}
return tasks.NewWaitOK()
}

View File

@@ -1,34 +0,0 @@
// Copyright 2016 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package options
import "time"
type PortLayerOptionsType struct {
SDK string `long:"sdk" description:"SDK URL or proxy" env:"VC_URL" required:"true"`
Cert string `long:"cert" description:"Client certificate" env:"VC_CERTIFICATE"`
Key string `long:"key" description:"Private key file" env:"VC_PRIVATE_KEY"`
Insecure bool `long:"insecure" default:"false" description:"Skip verification of server certificate" env:"VC_INSECURE"`
Keepalive time.Duration `long:"keepalive" default:"20s" description:"Session timeout" env:"VC_KEEPALIVE"`
DatacenterPath string `long:"datacenter" default:"/ha-datacenter" description:"Datacenter path" env:"DC_PATH" required:"true"`
ClusterPath string `long:"cluster" default:"" description:"Cluster path" env:"CS_PATH" required:"true"`
PoolPath string `long:"pool" default:"" description:"Resource pool path" env:"POOL_PATH" required:"true"`
DatastorePath string `long:"datastore" default:"/ha-datacenter/datastore/*" description:"Datastore path" env:"DS_PATH" required:"true"`
}
var (
PortLayerOptions = new(PortLayerOptionsType)
)

File diff suppressed because it is too large Load Diff