Compare commits

...

6 Commits

Author SHA1 Message Date
Chris Lu dcd6f8e056 master controller 2020-10-14 21:29:18 -07:00
Chris Lu b1706bd2ed make manifests 2020-10-14 20:15:06 -07:00
Chris Lu 9859b72c74 add changes to MasterSpec and MasterStatus 2020-10-14 20:14:30 -07:00
Chris Lu b346842ea3 operator-sdk create api --group seaweed --version v1 --kind Master --resource=true --controller=true 2020-10-14 20:09:08 -07:00
Chris Lu 15b319e331 prepare env 2020-10-14 20:08:09 -07:00
Chris Lu 45d81b6928 operator-sdk init --domain=seaweedfs.com 2020-10-14 19:51:59 -07:00
31 changed files with 278 additions and 1268 deletions

View File

@ -12,7 +12,7 @@ endif
BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL) BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL)
# Image URL to use all building/pushing image targets # Image URL to use all building/pushing image targets
IMG ?= controller:latest IMG ?= seaweedfs/operator:latest
# Produce CRDs that work back to Kubernetes 1.11 (no version conversion) # Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
CRD_OPTIONS ?= "crd:trivialVersions=true" CRD_OPTIONS ?= "crd:trivialVersions=true"
@ -26,8 +26,11 @@ endif
all: manager all: manager
# Run tests # Run tests
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
test: generate fmt vet manifests test: generate fmt vet manifests
go test ./... -coverprofile cover.out mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out
# Build manager binary # Build manager binary
manager: generate fmt vet manager: generate fmt vet

View File

@ -3,7 +3,7 @@ layout: go.kubebuilder.io/v2
repo: github.com/seaweedfs/seaweedfs-operator repo: github.com/seaweedfs/seaweedfs-operator
resources: resources:
- group: seaweed - group: seaweed
kind: Seaweed kind: Master
version: v1 version: v1
version: 3-alpha version: 3-alpha
plugins: plugins:

View File

@ -20,3 +20,9 @@ $ make run ENABLE_WEBHOOKS=false
$ kubectl apply -f config/samples/seaweed_v1_seaweed.yaml $ kubectl apply -f config/samples/seaweed_v1_seaweed.yaml
``` ```
## Create API and Controller
Here are the commands used to create customer resource definition (CRD)
```
operator-sdk create api --group seaweed --version v1 --kind Master --resource=true --controller=true
```

View File

@ -23,51 +23,43 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// SeaweedSpec defines the desired state of Seaweed // MasterSpec defines the desired state of Master
type SeaweedSpec struct { type MasterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file // Important: Run "make" to regenerate code after modifying this file
// MetricsAddress is Prometheus gateway address // +kubebuilder:validation:Minimum=0
MetricsAddress string `json:"metricsAddress,omitempty"` // Size is the size of the master deployment
Size int32 `json:"size"`
// VolumeServerCount is the number of volume servers, default to 1
VolumeServerCount int `json:"volumeServerCount,omitempty"`
// FilerCount is the number of filers, default to 1
FilerCount int `json:"filerCount,omitempty"`
// S3Count is the number of s3, default to 1
S3Count int `json:"s3Count,omitempty"`
} }
// SeaweedStatus defines the observed state of Seaweed // MasterStatus defines the observed state of Master
type SeaweedStatus struct { type MasterStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Nodes are the names of the master pods
// Important: Run "make" to regenerate code after modifying this file Nodes []string `json:"nodes"`
} }
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
// +kubebuilder:subresource:status // +kubebuilder:subresource:status
// Seaweed is the Schema for the seaweeds API // Master is the Schema for the masters API
type Seaweed struct { type Master struct {
metav1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"` metav1.ObjectMeta `json:"metadata,omitempty"`
Spec SeaweedSpec `json:"spec,omitempty"` Spec MasterSpec `json:"spec,omitempty"`
Status SeaweedStatus `json:"status,omitempty"` Status MasterStatus `json:"status,omitempty"`
} }
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
// SeaweedList contains a list of Seaweed // MasterList contains a list of Master
type SeaweedList struct { type MasterList struct {
metav1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"` metav1.ListMeta `json:"metadata,omitempty"`
Items []Seaweed `json:"items"` Items []Master `json:"items"`
} }
func init() { func init() {
SchemeBuilder.Register(&Seaweed{}, &SeaweedList{}) SchemeBuilder.Register(&Master{}, &MasterList{})
} }

View File

@ -25,26 +25,26 @@ import (
) )
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Seaweed) DeepCopyInto(out *Seaweed) { func (in *Master) DeepCopyInto(out *Master) {
*out = *in *out = *in
out.TypeMeta = in.TypeMeta out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec out.Spec = in.Spec
out.Status = in.Status in.Status.DeepCopyInto(&out.Status)
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Seaweed. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Master.
func (in *Seaweed) DeepCopy() *Seaweed { func (in *Master) DeepCopy() *Master {
if in == nil { if in == nil {
return nil return nil
} }
out := new(Seaweed) out := new(Master)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Seaweed) DeepCopyObject() runtime.Object { func (in *Master) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil { if c := in.DeepCopy(); c != nil {
return c return c
} }
@ -52,31 +52,31 @@ func (in *Seaweed) DeepCopyObject() runtime.Object {
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SeaweedList) DeepCopyInto(out *SeaweedList) { func (in *MasterList) DeepCopyInto(out *MasterList) {
*out = *in *out = *in
out.TypeMeta = in.TypeMeta out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta) in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil { if in.Items != nil {
in, out := &in.Items, &out.Items in, out := &in.Items, &out.Items
*out = make([]Seaweed, len(*in)) *out = make([]Master, len(*in))
for i := range *in { for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
} }
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedList. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterList.
func (in *SeaweedList) DeepCopy() *SeaweedList { func (in *MasterList) DeepCopy() *MasterList {
if in == nil { if in == nil {
return nil return nil
} }
out := new(SeaweedList) out := new(MasterList)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *SeaweedList) DeepCopyObject() runtime.Object { func (in *MasterList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil { if c := in.DeepCopy(); c != nil {
return c return c
} }
@ -84,31 +84,36 @@ func (in *SeaweedList) DeepCopyObject() runtime.Object {
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SeaweedSpec) DeepCopyInto(out *SeaweedSpec) { func (in *MasterSpec) DeepCopyInto(out *MasterSpec) {
*out = *in *out = *in
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedSpec. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterSpec.
func (in *SeaweedSpec) DeepCopy() *SeaweedSpec { func (in *MasterSpec) DeepCopy() *MasterSpec {
if in == nil { if in == nil {
return nil return nil
} }
out := new(SeaweedSpec) out := new(MasterSpec)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SeaweedStatus) DeepCopyInto(out *SeaweedStatus) { func (in *MasterStatus) DeepCopyInto(out *MasterStatus) {
*out = *in *out = *in
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
*out = make([]string, len(*in))
copy(*out, *in)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterStatus.
func (in *SeaweedStatus) DeepCopy() *SeaweedStatus { func (in *MasterStatus) DeepCopy() *MasterStatus {
if in == nil { if in == nil {
return nil return nil
} }
out := new(SeaweedStatus) out := new(MasterStatus)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }

View File

@ -6,20 +6,20 @@ metadata:
annotations: annotations:
controller-gen.kubebuilder.io/version: v0.3.0 controller-gen.kubebuilder.io/version: v0.3.0
creationTimestamp: null creationTimestamp: null
name: seaweeds.seaweed.seaweedfs.com name: masters.seaweed.seaweedfs.com
spec: spec:
group: seaweed.seaweedfs.com group: seaweed.seaweedfs.com
names: names:
kind: Seaweed kind: Master
listKind: SeaweedList listKind: MasterList
plural: seaweeds plural: masters
singular: seaweed singular: master
scope: Namespaced scope: Namespaced
subresources: subresources:
status: {} status: {}
validation: validation:
openAPIV3Schema: openAPIV3Schema:
description: Seaweed is the Schema for the seaweeds API description: Master is the Schema for the masters API
properties: properties:
apiVersion: apiVersion:
description: 'APIVersion defines the versioned schema of this representation description: 'APIVersion defines the versioned schema of this representation
@ -34,24 +34,26 @@ spec:
metadata: metadata:
type: object type: object
spec: spec:
description: SeaweedSpec defines the desired state of Seaweed description: MasterSpec defines the desired state of Master
properties: properties:
filerCount: size:
description: FilerCount is the number of filers, default to 1 description: Size is the size of the master deployment
type: integer format: int32
metricsAddress: minimum: 0
description: MetricsAddress is Prometheus gateway address
type: string
s3Count:
description: S3Count is the number of s3, default to 1
type: integer
volumeServerCount:
description: VolumeServerCount is the number of volume servers, default
to 1
type: integer type: integer
required:
- size
type: object type: object
status: status:
description: SeaweedStatus defines the observed state of Seaweed description: MasterStatus defines the observed state of Master
properties:
nodes:
description: Nodes are the names of the master pods
items:
type: string
type: array
required:
- nodes
type: object type: object
type: object type: object
version: v1 version: v1

View File

@ -2,18 +2,18 @@
# since it depends on service name and namespace that are out of this kustomize package. # since it depends on service name and namespace that are out of this kustomize package.
# It should be run by config/default # It should be run by config/default
resources: resources:
- bases/seaweed.seaweedfs.com_seaweeds.yaml - bases/seaweed.seaweedfs.com_masters.yaml
# +kubebuilder:scaffold:crdkustomizeresource # +kubebuilder:scaffold:crdkustomizeresource
patchesStrategicMerge: patchesStrategicMerge:
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix.
# patches here are for enabling the conversion webhook for each CRD # patches here are for enabling the conversion webhook for each CRD
#- patches/webhook_in_seaweeds.yaml #- patches/webhook_in_masters.yaml
# +kubebuilder:scaffold:crdkustomizewebhookpatch # +kubebuilder:scaffold:crdkustomizewebhookpatch
# [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix. # [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix.
# patches here are for enabling the CA injection for each CRD # patches here are for enabling the CA injection for each CRD
#- patches/cainjection_in_seaweeds.yaml #- patches/cainjection_in_masters.yaml
# +kubebuilder:scaffold:crdkustomizecainjectionpatch # +kubebuilder:scaffold:crdkustomizecainjectionpatch
# the following config is for teaching kustomize how to do kustomization for CRDs. # the following config is for teaching kustomize how to do kustomization for CRDs.

View File

@ -5,4 +5,4 @@ kind: CustomResourceDefinition
metadata: metadata:
annotations: annotations:
cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
name: seaweeds.seaweed.seaweedfs.com name: masters.seaweed.seaweedfs.com

View File

@ -3,7 +3,7 @@
apiVersion: apiextensions.k8s.io/v1beta1 apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition kind: CustomResourceDefinition
metadata: metadata:
name: seaweeds.seaweed.seaweedfs.com name: masters.seaweed.seaweedfs.com
spec: spec:
conversion: conversion:
strategy: Webhook strategy: Webhook

View File

@ -1,13 +1,13 @@
# permissions for end users to edit seaweeds. # permissions for end users to edit masters.
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:
name: seaweed-editor-role name: master-editor-role
rules: rules:
- apiGroups: - apiGroups:
- seaweed.seaweedfs.com - seaweed.seaweedfs.com
resources: resources:
- seaweeds - masters
verbs: verbs:
- create - create
- delete - delete
@ -19,6 +19,6 @@ rules:
- apiGroups: - apiGroups:
- seaweed.seaweedfs.com - seaweed.seaweedfs.com
resources: resources:
- seaweeds/status - masters/status
verbs: verbs:
- get - get

View File

@ -1,13 +1,13 @@
# permissions for end users to view seaweeds. # permissions for end users to view masters.
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:
name: seaweed-viewer-role name: master-viewer-role
rules: rules:
- apiGroups: - apiGroups:
- seaweed.seaweedfs.com - seaweed.seaweedfs.com
resources: resources:
- seaweeds - masters
verbs: verbs:
- get - get
- list - list
@ -15,6 +15,6 @@ rules:
- apiGroups: - apiGroups:
- seaweed.seaweedfs.com - seaweed.seaweedfs.com
resources: resources:
- seaweeds/status - masters/status
verbs: verbs:
- get - get

View File

@ -9,7 +9,7 @@ rules:
- apiGroups: - apiGroups:
- seaweed.seaweedfs.com - seaweed.seaweedfs.com
resources: resources:
- seaweeds - masters
verbs: verbs:
- create - create
- delete - delete
@ -21,7 +21,7 @@ rules:
- apiGroups: - apiGroups:
- seaweed.seaweedfs.com - seaweed.seaweedfs.com
resources: resources:
- seaweeds/status - masters/status
verbs: verbs:
- get - get
- patch - patch

View File

@ -1,3 +1,3 @@
## This file is auto-generated, do not modify ## ## This file is auto-generated, do not modify ##
resources: resources:
- seaweed_v1_seaweed.yaml - seaweed_v1_master.yaml

View File

@ -1,7 +1,7 @@
apiVersion: seaweed.seaweedfs.com/v1 apiVersion: seaweed.seaweedfs.com/v1
kind: Seaweed kind: Master
metadata: metadata:
name: seaweed1 name: master-sample
spec: spec:
# Add fields here # Add fields here
foo: bar foo: bar

View File

@ -1,82 +0,0 @@
package controllers
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureFilerStatefulSet(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureFilerService(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name)
filerStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, filerStatefulSet)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createFilerStatefulSet(seaweedCR)
log.Info("Creating a new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get filer statefulset")
return true, ctrl.Result{}, err
}
log.Info("Get filer stateful set " + filerStatefulSet.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
volumeServerService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, volumeServerService)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createFilerService(seaweedCR)
log.Info("Creating a new filer service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new filer service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get filer server service")
return true, ctrl.Result{}, err
}
log.Info("Get filer service " + volumeServerService.Name)
return false, ctrl.Result{}, nil
}
func labelsForFiler(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "filer", "name": name}
}

View File

@ -1,50 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createFilerService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForFiler(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-filer",
Namespace: m.Namespace,
Labels: labels,
Annotations: map[string]string{
"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-filer",
Protocol: corev1.Protocol("TCP"),
Port: 8888,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 8888,
},
},
{
Name: "swfs-volume-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18888,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 18888,
},
},
},
Selector: labels,
},
}
return dep
}

View File

@ -1,134 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
labels := labelsForFiler(m.Name)
replicas := int32(m.Spec.FilerCount)
rollingUpdatePartition := int32(0)
enableServiceLinks := false
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-filer",
Namespace: m.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: m.Name + "-filer",
PodManagementPolicy: appsv1.ParallelPodManagement,
Replicas: &replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &rollingUpdatePartition,
},
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed filer -port=8888 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name),
fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333",
m.Name, m.Name, m.Name, m.Name, m.Name, m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8888,
Name: "swfs-filer",
},
{
ContainerPort: 18888,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
return dep
}

View File

@ -1,90 +0,0 @@
package controllers
import (
"context"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
const (
MasterClusterSize = 3
)
func (r *SeaweedReconciler) ensureMaster(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureMasterService(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureMasterStatefulSet(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureMasterStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-master-statefulset", seaweedCR.Name)
masterStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-master", Namespace: seaweedCR.Namespace}, masterStatefulSet)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createMasterStatefulSet(seaweedCR)
log.Info("Creating a new master statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// sleep 60 seconds for DNS to have pod IP addresses ready
time.Sleep(time.Minute)
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
return true, ctrl.Result{}, err
}
log.Info("Get master stateful set " + masterStatefulSet.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureMasterService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-master-service", seaweedCR.Name)
masterService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-master", Namespace: seaweedCR.Namespace}, masterService)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createMasterService(seaweedCR)
log.Info("Creating a new master service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create master service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get master service", "Namespace", seaweedCR.Namespace, "Name", seaweedCR.Name+"-master")
return true, ctrl.Result{}, err
}
log.Info("Get master service " + masterService.Name)
return false, ctrl.Result{}, nil
}
func labelsForMaster(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "master", "name": name}
}

View File

@ -1,52 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createMasterService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForMaster(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-master",
Namespace: m.Namespace,
Labels: labels,
Annotations: map[string]string{
"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-master",
Protocol: corev1.Protocol("TCP"),
Port: 9333,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 9333,
},
},
{
Name: "swfs-master-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 19333,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 19333,
},
},
},
Selector: labels,
},
}
// Set master instance as the owner and controller
// ctrl.SetControllerReference(m, dep, r.Scheme)
return dep
}

View File

@ -1,150 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
labels := labelsForMaster(m.Name)
replicas := int32(MasterClusterSize)
rollingUpdatePartition := int32(0)
enableServiceLinks := false
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-master",
Namespace: m.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: m.Name + "-master",
PodManagementPolicy: appsv1.ParallelPodManagement,
Replicas: &replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &rollingUpdatePartition,
},
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
/*
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
*/
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name),
fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333",
m.Name, m.Name, m.Name, m.Name, m.Name, m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 9333,
Name: "swfs-master",
},
{
ContainerPort: 19333,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
// Set master instance as the owner and controller
// ctrl.SetControllerReference(m, dep, r.Scheme)
return dep
}

View File

@ -1,82 +0,0 @@
package controllers
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) ensureS3Servers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureS3Deployment(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureS3Service(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureS3Deployment(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-s3-statefulset", seaweedCR.Name)
s3Deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-s3", Namespace: seaweedCR.Namespace}, s3Deployment)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createS3Deployment(seaweedCR)
log.Info("Creating a new s3 deployment", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new s3 statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get s3 statefulset")
return true, ctrl.Result{}, err
}
log.Info("Get s3 stateful set " + s3Deployment.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureS3Service(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
s3Service := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-s3", Namespace: seaweedCR.Namespace}, s3Service)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createS3Service(seaweedCR)
log.Info("Creating a new s3 service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new s3 service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get s3 server service")
return true, ctrl.Result{}, err
}
log.Info("Get s3 service " + s3Service.Name)
return false, ctrl.Result{}, nil
}
func labelsForS3(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "s3", "name": name}
}

View File

@ -1,123 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createS3Deployment(m *seaweedv1.Seaweed) *appsv1.Deployment {
labels := labelsForS3(m.Name)
replicas := int32(m.Spec.S3Count)
enableServiceLinks := false
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-s3",
Namespace: m.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed s3 -port=8333 %s",
fmt.Sprintf("-filer=$(POD_NAME).%s-filer:8888", m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8333,
Name: "swfs-s3",
},
{
ContainerPort: 18333,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
return dep
}

View File

@ -1,36 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createS3Service(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForS3(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-s3",
Namespace: m.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "swfs-s3",
Protocol: corev1.Protocol("TCP"),
Port: 8333,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 8333,
},
},
},
Selector: labels,
},
}
return dep
}

View File

@ -1,82 +0,0 @@
package controllers
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) ensureVolumeServers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureVolumeServerStatefulSet(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureVolumeServerService(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name)
volumeServerStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerStatefulSet)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createVolumeServerStatefulSet(seaweedCR)
log.Info("Creating a new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get volume server statefulset")
return true, ctrl.Result{}, err
}
log.Info("Get volume stateful set " + volumeServerStatefulSet.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-volume-service", seaweedCR.Name)
volumeServerService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerService)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createVolumeServerService(seaweedCR)
log.Info("Creating a new volume service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new volume service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get volume server service")
return true, ctrl.Result{}, err
}
log.Info("Get volume service " + volumeServerService.Name)
return false, ctrl.Result{}, nil
}
func labelsForVolumeServer(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "volume", "name": name}
}

View File

@ -1,50 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createVolumeServerService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForVolumeServer(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-volume",
Namespace: m.Namespace,
Labels: labels,
Annotations: map[string]string{
"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-volume",
Protocol: corev1.Protocol("TCP"),
Port: 8444,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 8444,
},
},
{
Name: "swfs-volume-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18444,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 18444,
},
},
},
Selector: labels,
},
}
return dep
}

View File

@ -1,134 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
labels := labelsForVolumeServer(m.Name)
replicas := int32(m.Spec.VolumeServerCount)
rollingUpdatePartition := int32(0)
enableServiceLinks := false
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-volume",
Namespace: m.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: m.Name + "-volume",
PodManagementPolicy: appsv1.ParallelPodManagement,
Replicas: &replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &rollingUpdatePartition,
},
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed volume -port=8444 -max=0 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name),
fmt.Sprintf("-mserver=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333",
m.Name, m.Name, m.Name, m.Name, m.Name, m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8444,
Name: "swfs-volume",
},
{
ContainerPort: 18444,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
return dep
}

View File

@ -0,0 +1,181 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"context"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"reflect"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
// MasterReconciler reconciles a Master object
type MasterReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=masters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=masters/status,verbs=get;update;patch
func (r *MasterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("master", req.NamespacedName)
// Fetch the Master instance
master := &seaweedv1.Master{}
err := r.Get(ctx, req.NamespacedName, master)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Master resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get Master")
return ctrl.Result{}, err
}
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: master.Name, Namespace: master.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.deploymentForMaster(master)
log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
return ctrl.Result{}, err
}
// Ensure the deployment size is the same as the spec
size := master.Spec.Size
if *found.Spec.Replicas != size {
found.Spec.Replicas = &size
err = r.Update(ctx, found)
if err != nil {
log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
return ctrl.Result{}, err
}
// Spec updated - return and requeue
return ctrl.Result{Requeue: true}, nil
}
// Update the Master status with the pod names
// List the pods for this master's deployment
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(master.Namespace),
client.MatchingLabels(labelsForMaster(master.Name)),
}
if err = r.List(ctx, podList, listOpts...); err != nil {
log.Error(err, "Failed to list pods", "Master.Namespace", master.Namespace, "Master.Name", master.Name)
return ctrl.Result{}, err
}
podNames := getPodNames(podList.Items)
// Update status.Nodes if needed
if !reflect.DeepEqual(podNames, master.Status.Nodes) {
master.Status.Nodes = podNames
err := r.Status().Update(ctx, master)
if err != nil {
log.Error(err, "Failed to update Master status")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// deploymentForMaster returns a master Deployment object
func (r *MasterReconciler) deploymentForMaster(m *seaweedv1.Master) *appsv1.Deployment {
ls := labelsForMaster(m.Name)
replicas := m.Spec.Size
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name,
Namespace: m.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: ls,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: ls,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "Master:1.4.36-alpine",
Name: "Master",
Command: []string{"Master", "-m=64", "-o", "modern", "-v"},
Ports: []corev1.ContainerPort{{
ContainerPort: 11211,
Name: "Master",
}},
}},
},
},
},
}
// Set Master instance as the owner and controller
ctrl.SetControllerReference(m, dep, r.Scheme)
return dep
}
// labelsForMaster returns the labels for selecting the resources
// belonging to the given Master CR name.
func labelsForMaster(name string) map[string]string {
return map[string]string{"app": "Master", "Master_cr": name}
}
// getPodNames returns the pod names of the array of pods passed in
func getPodNames(pods []corev1.Pod) []string {
var podNames []string
for _, pod := range pods {
podNames = append(podNames, pod.Name)
}
return podNames
}
func (r *MasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&seaweedv1.Master{}).
Complete(r)
}

View File

@ -1,107 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"time"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
// SeaweedReconciler reconciles a Seaweed object
type SeaweedReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=seaweeds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=seaweeds/status,verbs=get;update;patch
func (r *SeaweedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("seaweed", req.NamespacedName)
log.Info("start Reconcile ...")
seaweedCR, done, result, err := r.findSeaweedCustomResourceInstance(ctx, log, req)
if done {
return result, err
}
// temporary
if seaweedCR.Spec.VolumeServerCount == 0 {
seaweedCR.Spec.VolumeServerCount = 1
}
if seaweedCR.Spec.FilerCount == 0 {
seaweedCR.Spec.FilerCount = 1
}
if seaweedCR.Spec.S3Count == 0 {
seaweedCR.Spec.S3Count = 1
}
if done, result, err = r.ensureMaster(seaweedCR); done {
return result, err
}
if done, result, err = r.ensureVolumeServers(seaweedCR); done {
return result, err
}
if done, result, err = r.ensureFilerServers(seaweedCR); done {
return result, err
}
if done, result, err = r.ensureS3Servers(seaweedCR); done {
return result, err
}
return ctrl.Result{}, nil
}
func (r *SeaweedReconciler) findSeaweedCustomResourceInstance(ctx context.Context, log logr.Logger, req ctrl.Request) (*seaweedv1.Seaweed, bool, ctrl.Result, error) {
// fetch the master instance
seaweedCR := &seaweedv1.Seaweed{}
err := r.Get(ctx, req.NamespacedName, seaweedCR)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Seaweed CR not found. Ignoring since object must be deleted")
return nil, true, ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get SeaweedCR")
return nil, true, ctrl.Result{}, err
}
log.Info("Get master " + seaweedCR.Name)
return seaweedCR, false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&seaweedv1.Seaweed{}).
Complete(r)
}

1
go.mod
View File

@ -6,7 +6,6 @@ require (
github.com/go-logr/logr v0.1.0 github.com/go-logr/logr v0.1.0
github.com/onsi/ginkgo v1.11.0 github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.8.1 github.com/onsi/gomega v1.8.1
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2 k8s.io/apimachinery v0.18.2
k8s.io/client-go v0.18.2 k8s.io/client-go v0.18.2
sigs.k8s.io/controller-runtime v0.6.0 sigs.k8s.io/controller-runtime v0.6.0

View File

@ -67,12 +67,12 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if err = (&controllers.SeaweedReconciler{ if err = (&controllers.MasterReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Seaweed"), Log: ctrl.Log.WithName("controllers").WithName("Master"),
Scheme: mgr.GetScheme(), Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil { }).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Seaweed") setupLog.Error(err, "unable to create controller", "controller", "Master")
os.Exit(1) os.Exit(1)
} }
// +kubebuilder:scaffold:builder // +kubebuilder:scaffold:builder

View File

@ -1,15 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4"> <module type="WEB_MODULE" version="4">
<component name="CheckStyle-IDEA-Module">
<option name="configuration">
<map />
</option>
</component>
<component name="Go" enabled="true" /> <component name="Go" enabled="true" />
<component name="NewModuleRootManager" inherit-compiler-output="true"> <component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output /> <exclude-output />
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
</module> </module>