Compare commits

...

6 Commits

Author SHA1 Message Date
Chris Lu
dcd6f8e056 master controller 2020-10-14 21:29:18 -07:00
Chris Lu
b1706bd2ed make manifests 2020-10-14 20:15:06 -07:00
Chris Lu
9859b72c74 add changes to MasterSpec and MasterStatus 2020-10-14 20:14:30 -07:00
Chris Lu
b346842ea3 operator-sdk create api --group seaweed --version v1 --kind Master --resource=true --controller=true 2020-10-14 20:09:08 -07:00
Chris Lu
15b319e331 prepare env 2020-10-14 20:08:09 -07:00
Chris Lu
45d81b6928 operator-sdk init --domain=seaweedfs.com 2020-10-14 19:51:59 -07:00
31 changed files with 278 additions and 1268 deletions

View File

@ -12,7 +12,7 @@ endif
BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL)
# Image URL to use all building/pushing image targets
IMG ?= controller:latest
IMG ?= seaweedfs/operator:latest
# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
CRD_OPTIONS ?= "crd:trivialVersions=true"
@ -26,8 +26,11 @@ endif
all: manager
# Run tests
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
test: generate fmt vet manifests
go test ./... -coverprofile cover.out
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out
# Build manager binary
manager: generate fmt vet

View File

@ -3,7 +3,7 @@ layout: go.kubebuilder.io/v2
repo: github.com/seaweedfs/seaweedfs-operator
resources:
- group: seaweed
kind: Seaweed
kind: Master
version: v1
version: 3-alpha
plugins:

View File

@ -20,3 +20,9 @@ $ make run ENABLE_WEBHOOKS=false
$ kubectl apply -f config/samples/seaweed_v1_seaweed.yaml
```
## Create API and Controller
Here are the commands used to create customer resource definition (CRD)
```
operator-sdk create api --group seaweed --version v1 --kind Master --resource=true --controller=true
```

View File

@ -23,51 +23,43 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// SeaweedSpec defines the desired state of Seaweed
type SeaweedSpec struct {
// MasterSpec defines the desired state of Master
type MasterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// MetricsAddress is Prometheus gateway address
MetricsAddress string `json:"metricsAddress,omitempty"`
// VolumeServerCount is the number of volume servers, default to 1
VolumeServerCount int `json:"volumeServerCount,omitempty"`
// FilerCount is the number of filers, default to 1
FilerCount int `json:"filerCount,omitempty"`
// S3Count is the number of s3, default to 1
S3Count int `json:"s3Count,omitempty"`
// +kubebuilder:validation:Minimum=0
// Size is the size of the master deployment
Size int32 `json:"size"`
}
// SeaweedStatus defines the observed state of Seaweed
type SeaweedStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
// MasterStatus defines the observed state of Master
type MasterStatus struct {
// Nodes are the names of the master pods
Nodes []string `json:"nodes"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// Seaweed is the Schema for the seaweeds API
type Seaweed struct {
// Master is the Schema for the masters API
type Master struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec SeaweedSpec `json:"spec,omitempty"`
Status SeaweedStatus `json:"status,omitempty"`
Spec MasterSpec `json:"spec,omitempty"`
Status MasterStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// SeaweedList contains a list of Seaweed
type SeaweedList struct {
// MasterList contains a list of Master
type MasterList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Seaweed `json:"items"`
Items []Master `json:"items"`
}
func init() {
SchemeBuilder.Register(&Seaweed{}, &SeaweedList{})
SchemeBuilder.Register(&Master{}, &MasterList{})
}

View File

@ -25,26 +25,26 @@ import (
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Seaweed) DeepCopyInto(out *Seaweed) {
func (in *Master) DeepCopyInto(out *Master) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
out.Status = in.Status
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Seaweed.
func (in *Seaweed) DeepCopy() *Seaweed {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Master.
func (in *Master) DeepCopy() *Master {
if in == nil {
return nil
}
out := new(Seaweed)
out := new(Master)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Seaweed) DeepCopyObject() runtime.Object {
func (in *Master) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
@ -52,31 +52,31 @@ func (in *Seaweed) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SeaweedList) DeepCopyInto(out *SeaweedList) {
func (in *MasterList) DeepCopyInto(out *MasterList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Seaweed, len(*in))
*out = make([]Master, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedList.
func (in *SeaweedList) DeepCopy() *SeaweedList {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterList.
func (in *MasterList) DeepCopy() *MasterList {
if in == nil {
return nil
}
out := new(SeaweedList)
out := new(MasterList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *SeaweedList) DeepCopyObject() runtime.Object {
func (in *MasterList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
@ -84,31 +84,36 @@ func (in *SeaweedList) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SeaweedSpec) DeepCopyInto(out *SeaweedSpec) {
func (in *MasterSpec) DeepCopyInto(out *MasterSpec) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedSpec.
func (in *SeaweedSpec) DeepCopy() *SeaweedSpec {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterSpec.
func (in *MasterSpec) DeepCopy() *MasterSpec {
if in == nil {
return nil
}
out := new(SeaweedSpec)
out := new(MasterSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SeaweedStatus) DeepCopyInto(out *SeaweedStatus) {
func (in *MasterStatus) DeepCopyInto(out *MasterStatus) {
*out = *in
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedStatus.
func (in *SeaweedStatus) DeepCopy() *SeaweedStatus {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterStatus.
func (in *MasterStatus) DeepCopy() *MasterStatus {
if in == nil {
return nil
}
out := new(SeaweedStatus)
out := new(MasterStatus)
in.DeepCopyInto(out)
return out
}

View File

@ -6,20 +6,20 @@ metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.3.0
creationTimestamp: null
name: seaweeds.seaweed.seaweedfs.com
name: masters.seaweed.seaweedfs.com
spec:
group: seaweed.seaweedfs.com
names:
kind: Seaweed
listKind: SeaweedList
plural: seaweeds
singular: seaweed
kind: Master
listKind: MasterList
plural: masters
singular: master
scope: Namespaced
subresources:
status: {}
validation:
openAPIV3Schema:
description: Seaweed is the Schema for the seaweeds API
description: Master is the Schema for the masters API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
@ -34,24 +34,26 @@ spec:
metadata:
type: object
spec:
description: SeaweedSpec defines the desired state of Seaweed
description: MasterSpec defines the desired state of Master
properties:
filerCount:
description: FilerCount is the number of filers, default to 1
type: integer
metricsAddress:
description: MetricsAddress is Prometheus gateway address
type: string
s3Count:
description: S3Count is the number of s3, default to 1
type: integer
volumeServerCount:
description: VolumeServerCount is the number of volume servers, default
to 1
size:
description: Size is the size of the master deployment
format: int32
minimum: 0
type: integer
required:
- size
type: object
status:
description: SeaweedStatus defines the observed state of Seaweed
description: MasterStatus defines the observed state of Master
properties:
nodes:
description: Nodes are the names of the master pods
items:
type: string
type: array
required:
- nodes
type: object
type: object
version: v1

View File

@ -2,18 +2,18 @@
# since it depends on service name and namespace that are out of this kustomize package.
# It should be run by config/default
resources:
- bases/seaweed.seaweedfs.com_seaweeds.yaml
- bases/seaweed.seaweedfs.com_masters.yaml
# +kubebuilder:scaffold:crdkustomizeresource
patchesStrategicMerge:
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix.
# patches here are for enabling the conversion webhook for each CRD
#- patches/webhook_in_seaweeds.yaml
#- patches/webhook_in_masters.yaml
# +kubebuilder:scaffold:crdkustomizewebhookpatch
# [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix.
# patches here are for enabling the CA injection for each CRD
#- patches/cainjection_in_seaweeds.yaml
#- patches/cainjection_in_masters.yaml
# +kubebuilder:scaffold:crdkustomizecainjectionpatch
# the following config is for teaching kustomize how to do kustomization for CRDs.

View File

@ -5,4 +5,4 @@ kind: CustomResourceDefinition
metadata:
annotations:
cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
name: seaweeds.seaweed.seaweedfs.com
name: masters.seaweed.seaweedfs.com

View File

@ -3,7 +3,7 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: seaweeds.seaweed.seaweedfs.com
name: masters.seaweed.seaweedfs.com
spec:
conversion:
strategy: Webhook

View File

@ -1,13 +1,13 @@
# permissions for end users to edit seaweeds.
# permissions for end users to edit masters.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: seaweed-editor-role
name: master-editor-role
rules:
- apiGroups:
- seaweed.seaweedfs.com
resources:
- seaweeds
- masters
verbs:
- create
- delete
@ -19,6 +19,6 @@ rules:
- apiGroups:
- seaweed.seaweedfs.com
resources:
- seaweeds/status
- masters/status
verbs:
- get

View File

@ -1,13 +1,13 @@
# permissions for end users to view seaweeds.
# permissions for end users to view masters.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: seaweed-viewer-role
name: master-viewer-role
rules:
- apiGroups:
- seaweed.seaweedfs.com
resources:
- seaweeds
- masters
verbs:
- get
- list
@ -15,6 +15,6 @@ rules:
- apiGroups:
- seaweed.seaweedfs.com
resources:
- seaweeds/status
- masters/status
verbs:
- get

View File

@ -9,7 +9,7 @@ rules:
- apiGroups:
- seaweed.seaweedfs.com
resources:
- seaweeds
- masters
verbs:
- create
- delete
@ -21,7 +21,7 @@ rules:
- apiGroups:
- seaweed.seaweedfs.com
resources:
- seaweeds/status
- masters/status
verbs:
- get
- patch

View File

@ -1,3 +1,3 @@
## This file is auto-generated, do not modify ##
resources:
- seaweed_v1_seaweed.yaml
- seaweed_v1_master.yaml

View File

@ -1,7 +1,7 @@
apiVersion: seaweed.seaweedfs.com/v1
kind: Seaweed
kind: Master
metadata:
name: seaweed1
name: master-sample
spec:
# Add fields here
foo: bar

View File

@ -1,82 +0,0 @@
package controllers
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureFilerStatefulSet(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureFilerService(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name)
filerStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, filerStatefulSet)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createFilerStatefulSet(seaweedCR)
log.Info("Creating a new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get filer statefulset")
return true, ctrl.Result{}, err
}
log.Info("Get filer stateful set " + filerStatefulSet.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
volumeServerService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, volumeServerService)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createFilerService(seaweedCR)
log.Info("Creating a new filer service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new filer service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get filer server service")
return true, ctrl.Result{}, err
}
log.Info("Get filer service " + volumeServerService.Name)
return false, ctrl.Result{}, nil
}
func labelsForFiler(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "filer", "name": name}
}

View File

@ -1,50 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createFilerService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForFiler(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-filer",
Namespace: m.Namespace,
Labels: labels,
Annotations: map[string]string{
"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-filer",
Protocol: corev1.Protocol("TCP"),
Port: 8888,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 8888,
},
},
{
Name: "swfs-volume-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18888,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 18888,
},
},
},
Selector: labels,
},
}
return dep
}

View File

@ -1,134 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
labels := labelsForFiler(m.Name)
replicas := int32(m.Spec.FilerCount)
rollingUpdatePartition := int32(0)
enableServiceLinks := false
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-filer",
Namespace: m.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: m.Name + "-filer",
PodManagementPolicy: appsv1.ParallelPodManagement,
Replicas: &replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &rollingUpdatePartition,
},
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed filer -port=8888 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name),
fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333",
m.Name, m.Name, m.Name, m.Name, m.Name, m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8888,
Name: "swfs-filer",
},
{
ContainerPort: 18888,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
return dep
}

View File

@ -1,90 +0,0 @@
package controllers
import (
"context"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
const (
MasterClusterSize = 3
)
func (r *SeaweedReconciler) ensureMaster(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureMasterService(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureMasterStatefulSet(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureMasterStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-master-statefulset", seaweedCR.Name)
masterStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-master", Namespace: seaweedCR.Namespace}, masterStatefulSet)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createMasterStatefulSet(seaweedCR)
log.Info("Creating a new master statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// sleep 60 seconds for DNS to have pod IP addresses ready
time.Sleep(time.Minute)
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
return true, ctrl.Result{}, err
}
log.Info("Get master stateful set " + masterStatefulSet.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureMasterService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-master-service", seaweedCR.Name)
masterService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-master", Namespace: seaweedCR.Namespace}, masterService)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createMasterService(seaweedCR)
log.Info("Creating a new master service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create master service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get master service", "Namespace", seaweedCR.Namespace, "Name", seaweedCR.Name+"-master")
return true, ctrl.Result{}, err
}
log.Info("Get master service " + masterService.Name)
return false, ctrl.Result{}, nil
}
func labelsForMaster(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "master", "name": name}
}

View File

@ -1,52 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createMasterService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForMaster(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-master",
Namespace: m.Namespace,
Labels: labels,
Annotations: map[string]string{
"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-master",
Protocol: corev1.Protocol("TCP"),
Port: 9333,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 9333,
},
},
{
Name: "swfs-master-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 19333,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 19333,
},
},
},
Selector: labels,
},
}
// Set master instance as the owner and controller
// ctrl.SetControllerReference(m, dep, r.Scheme)
return dep
}

View File

@ -1,150 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
labels := labelsForMaster(m.Name)
replicas := int32(MasterClusterSize)
rollingUpdatePartition := int32(0)
enableServiceLinks := false
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-master",
Namespace: m.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: m.Name + "-master",
PodManagementPolicy: appsv1.ParallelPodManagement,
Replicas: &replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &rollingUpdatePartition,
},
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
/*
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
*/
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name),
fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333",
m.Name, m.Name, m.Name, m.Name, m.Name, m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 9333,
Name: "swfs-master",
},
{
ContainerPort: 19333,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
// Set master instance as the owner and controller
// ctrl.SetControllerReference(m, dep, r.Scheme)
return dep
}

View File

@ -1,82 +0,0 @@
package controllers
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) ensureS3Servers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureS3Deployment(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureS3Service(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureS3Deployment(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-s3-statefulset", seaweedCR.Name)
s3Deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-s3", Namespace: seaweedCR.Namespace}, s3Deployment)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createS3Deployment(seaweedCR)
log.Info("Creating a new s3 deployment", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new s3 statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get s3 statefulset")
return true, ctrl.Result{}, err
}
log.Info("Get s3 stateful set " + s3Deployment.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureS3Service(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
s3Service := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-s3", Namespace: seaweedCR.Namespace}, s3Service)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createS3Service(seaweedCR)
log.Info("Creating a new s3 service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new s3 service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get s3 server service")
return true, ctrl.Result{}, err
}
log.Info("Get s3 service " + s3Service.Name)
return false, ctrl.Result{}, nil
}
func labelsForS3(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "s3", "name": name}
}

View File

@ -1,123 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createS3Deployment(m *seaweedv1.Seaweed) *appsv1.Deployment {
labels := labelsForS3(m.Name)
replicas := int32(m.Spec.S3Count)
enableServiceLinks := false
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-s3",
Namespace: m.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed s3 -port=8333 %s",
fmt.Sprintf("-filer=$(POD_NAME).%s-filer:8888", m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8333,
Name: "swfs-s3",
},
{
ContainerPort: 18333,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
return dep
}

View File

@ -1,36 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createS3Service(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForS3(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-s3",
Namespace: m.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "swfs-s3",
Protocol: corev1.Protocol("TCP"),
Port: 8333,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 8333,
},
},
},
Selector: labels,
},
}
return dep
}

View File

@ -1,82 +0,0 @@
package controllers
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) ensureVolumeServers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)
if done, result, err = r.ensureVolumeServerStatefulSet(seaweedCR); done {
return done, result, err
}
if done, result, err = r.ensureVolumeServerService(seaweedCR); done {
return done, result, err
}
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name)
volumeServerStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerStatefulSet)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createVolumeServerStatefulSet(seaweedCR)
log.Info("Creating a new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get volume server statefulset")
return true, ctrl.Result{}, err
}
log.Info("Get volume stateful set " + volumeServerStatefulSet.Name)
return false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("sw-volume-service", seaweedCR.Name)
volumeServerService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerService)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.createVolumeServerService(seaweedCR)
log.Info("Creating a new volume service", "Namespace", dep.Namespace, "Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new volume service", "Namespace", dep.Namespace, "Name", dep.Name)
return true, ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return false, ctrl.Result{}, nil
} else if err != nil {
log.Error(err, "Failed to get volume server service")
return true, ctrl.Result{}, err
}
log.Info("Get volume service " + volumeServerService.Name)
return false, ctrl.Result{}, nil
}
func labelsForVolumeServer(name string) map[string]string {
return map[string]string{"app": "seaweedfs", "role": "volume", "name": name}
}

View File

@ -1,50 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createVolumeServerService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForVolumeServer(m.Name)
dep := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-volume",
Namespace: m.Namespace,
Labels: labels,
Annotations: map[string]string{
"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-volume",
Protocol: corev1.Protocol("TCP"),
Port: 8444,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 8444,
},
},
{
Name: "swfs-volume-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18444,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 18444,
},
},
},
Selector: labels,
},
}
return dep
}

View File

@ -1,134 +0,0 @@
package controllers
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
labels := labelsForVolumeServer(m.Name)
replicas := int32(m.Spec.VolumeServerCount)
rollingUpdatePartition := int32(0)
enableServiceLinks := false
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-volume",
Namespace: m.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: m.Name + "-volume",
PodManagementPolicy: appsv1.ParallelPodManagement,
Replicas: &replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &rollingUpdatePartition,
},
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: "chrislusf/seaweedfs:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed volume -port=8444 -max=0 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name),
fmt.Sprintf("-mserver=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333",
m.Name, m.Name, m.Name, m.Name, m.Name, m.Name),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8444,
Name: "swfs-volume",
},
{
ContainerPort: 18444,
},
},
/*
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 0,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.IntOrString{
Type: 0,
IntVal: 9333,
},
Scheme: "http",
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 0,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 6,
},
*/
}},
},
},
},
}
return dep
}

View File

@ -0,0 +1,181 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"context"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"reflect"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
// MasterReconciler reconciles a Master object
type MasterReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=masters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=masters/status,verbs=get;update;patch
func (r *MasterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("master", req.NamespacedName)
// Fetch the Master instance
master := &seaweedv1.Master{}
err := r.Get(ctx, req.NamespacedName, master)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Master resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get Master")
return ctrl.Result{}, err
}
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: master.Name, Namespace: master.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
// Define a new deployment
dep := r.deploymentForMaster(master)
log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
err = r.Create(ctx, dep)
if err != nil {
log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
return ctrl.Result{}, err
}
// Ensure the deployment size is the same as the spec
size := master.Spec.Size
if *found.Spec.Replicas != size {
found.Spec.Replicas = &size
err = r.Update(ctx, found)
if err != nil {
log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
return ctrl.Result{}, err
}
// Spec updated - return and requeue
return ctrl.Result{Requeue: true}, nil
}
// Update the Master status with the pod names
// List the pods for this master's deployment
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(master.Namespace),
client.MatchingLabels(labelsForMaster(master.Name)),
}
if err = r.List(ctx, podList, listOpts...); err != nil {
log.Error(err, "Failed to list pods", "Master.Namespace", master.Namespace, "Master.Name", master.Name)
return ctrl.Result{}, err
}
podNames := getPodNames(podList.Items)
// Update status.Nodes if needed
if !reflect.DeepEqual(podNames, master.Status.Nodes) {
master.Status.Nodes = podNames
err := r.Status().Update(ctx, master)
if err != nil {
log.Error(err, "Failed to update Master status")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// deploymentForMaster returns a master Deployment object
func (r *MasterReconciler) deploymentForMaster(m *seaweedv1.Master) *appsv1.Deployment {
ls := labelsForMaster(m.Name)
replicas := m.Spec.Size
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name,
Namespace: m.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: ls,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: ls,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "Master:1.4.36-alpine",
Name: "Master",
Command: []string{"Master", "-m=64", "-o", "modern", "-v"},
Ports: []corev1.ContainerPort{{
ContainerPort: 11211,
Name: "Master",
}},
}},
},
},
},
}
// Set Master instance as the owner and controller
ctrl.SetControllerReference(m, dep, r.Scheme)
return dep
}
// labelsForMaster returns the labels for selecting the resources
// belonging to the given Master CR name.
func labelsForMaster(name string) map[string]string {
return map[string]string{"app": "Master", "Master_cr": name}
}
// getPodNames returns the pod names of the array of pods passed in
func getPodNames(pods []corev1.Pod) []string {
var podNames []string
for _, pod := range pods {
podNames = append(podNames, pod.Name)
}
return podNames
}
func (r *MasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&seaweedv1.Master{}).
Complete(r)
}

View File

@ -1,107 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"time"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
// SeaweedReconciler reconciles a Seaweed object
type SeaweedReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=seaweeds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=seaweed.seaweedfs.com,resources=seaweeds/status,verbs=get;update;patch
func (r *SeaweedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("seaweed", req.NamespacedName)
log.Info("start Reconcile ...")
seaweedCR, done, result, err := r.findSeaweedCustomResourceInstance(ctx, log, req)
if done {
return result, err
}
// temporary
if seaweedCR.Spec.VolumeServerCount == 0 {
seaweedCR.Spec.VolumeServerCount = 1
}
if seaweedCR.Spec.FilerCount == 0 {
seaweedCR.Spec.FilerCount = 1
}
if seaweedCR.Spec.S3Count == 0 {
seaweedCR.Spec.S3Count = 1
}
if done, result, err = r.ensureMaster(seaweedCR); done {
return result, err
}
if done, result, err = r.ensureVolumeServers(seaweedCR); done {
return result, err
}
if done, result, err = r.ensureFilerServers(seaweedCR); done {
return result, err
}
if done, result, err = r.ensureS3Servers(seaweedCR); done {
return result, err
}
return ctrl.Result{}, nil
}
func (r *SeaweedReconciler) findSeaweedCustomResourceInstance(ctx context.Context, log logr.Logger, req ctrl.Request) (*seaweedv1.Seaweed, bool, ctrl.Result, error) {
// fetch the master instance
seaweedCR := &seaweedv1.Seaweed{}
err := r.Get(ctx, req.NamespacedName, seaweedCR)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Seaweed CR not found. Ignoring since object must be deleted")
return nil, true, ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get SeaweedCR")
return nil, true, ctrl.Result{}, err
}
log.Info("Get master " + seaweedCR.Name)
return seaweedCR, false, ctrl.Result{}, nil
}
func (r *SeaweedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&seaweedv1.Seaweed{}).
Complete(r)
}

1
go.mod
View File

@ -6,7 +6,6 @@ require (
github.com/go-logr/logr v0.1.0
github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.8.1
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2
k8s.io/client-go v0.18.2
sigs.k8s.io/controller-runtime v0.6.0

View File

@ -67,12 +67,12 @@ func main() {
os.Exit(1)
}
if err = (&controllers.SeaweedReconciler{
if err = (&controllers.MasterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Seaweed"),
Log: ctrl.Log.WithName("controllers").WithName("Master"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Seaweed")
setupLog.Error(err, "unable to create controller", "controller", "Master")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

View File

@ -1,15 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="CheckStyle-IDEA-Module">
<option name="configuration">
<map />
</option>
</component>
<component name="Go" enabled="true" />
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>