Merge pull request #5 from howardlau1999/refactor

Use accessor to merge config and extract magic numbers
This commit is contained in:
Chris Lu 2020-10-28 00:40:24 -07:00 committed by GitHub
commit c06204b5b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 664 additions and 291 deletions

View File

@ -0,0 +1,216 @@
package v1
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)
// ComponentAccessor is the interface to access component details, which respects the cluster-level properties
// and component-level overrides
// +kubebuilder:object:root=false
// +kubebuilder:object:generate=false
type ComponentAccessor interface {
ImagePullPolicy() corev1.PullPolicy
ImagePullSecrets() []corev1.LocalObjectReference
HostNetwork() bool
Affinity() *corev1.Affinity
PriorityClassName() *string
NodeSelector() map[string]string
Annotations() map[string]string
Tolerations() []corev1.Toleration
SchedulerName() string
DNSPolicy() corev1.DNSPolicy
BuildPodSpec() corev1.PodSpec
Env() []corev1.EnvVar
AdditionalContainers() []corev1.Container
AdditionalVolumes() []corev1.Volume
TerminationGracePeriodSeconds() *int64
StatefulSetUpdateStrategy() appsv1.StatefulSetUpdateStrategyType
}
type componentAccessorImpl struct {
imagePullPolicy corev1.PullPolicy
imagePullSecrets []corev1.LocalObjectReference
hostNetwork *bool
affinity *corev1.Affinity
priorityClassName *string
schedulerName string
clusterNodeSelector map[string]string
clusterAnnotations map[string]string
tolerations []corev1.Toleration
statefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType
// ComponentSpec is the Component Spec
ComponentSpec *ComponentSpec
}
func (a *componentAccessorImpl) StatefulSetUpdateStrategy() appsv1.StatefulSetUpdateStrategyType {
strategy := a.ComponentSpec.StatefulSetUpdateStrategy
if len(strategy) != 0 {
return strategy
}
strategy = a.statefulSetUpdateStrategy
if len(strategy) != 0 {
return strategy
}
return appsv1.RollingUpdateStatefulSetStrategyType
}
func (a *componentAccessorImpl) ImagePullPolicy() corev1.PullPolicy {
pp := a.ComponentSpec.ImagePullPolicy
if pp == nil {
return a.imagePullPolicy
}
return *pp
}
func (a *componentAccessorImpl) ImagePullSecrets() []corev1.LocalObjectReference {
ips := a.ComponentSpec.ImagePullSecrets
if ips == nil {
return a.imagePullSecrets
}
return ips
}
func (a *componentAccessorImpl) HostNetwork() bool {
hostNetwork := a.ComponentSpec.HostNetwork
if hostNetwork == nil {
hostNetwork = a.hostNetwork
}
if hostNetwork == nil {
return false
}
return *hostNetwork
}
func (a *componentAccessorImpl) Affinity() *corev1.Affinity {
affi := a.ComponentSpec.Affinity
if affi == nil {
affi = a.affinity
}
return affi
}
func (a *componentAccessorImpl) PriorityClassName() *string {
pcn := a.ComponentSpec.PriorityClassName
if pcn == nil {
pcn = a.priorityClassName
}
return pcn
}
func (a *componentAccessorImpl) SchedulerName() string {
pcn := a.ComponentSpec.SchedulerName
if pcn == nil {
pcn = &a.schedulerName
}
return *pcn
}
func (a *componentAccessorImpl) NodeSelector() map[string]string {
sel := map[string]string{}
for k, v := range a.clusterNodeSelector {
sel[k] = v
}
for k, v := range a.ComponentSpec.NodeSelector {
sel[k] = v
}
return sel
}
func (a *componentAccessorImpl) Annotations() map[string]string {
anno := map[string]string{}
for k, v := range a.clusterAnnotations {
anno[k] = v
}
for k, v := range a.ComponentSpec.Annotations {
anno[k] = v
}
return anno
}
func (a *componentAccessorImpl) Tolerations() []corev1.Toleration {
tols := a.ComponentSpec.Tolerations
if len(tols) == 0 {
tols = a.tolerations
}
return tols
}
func (a *componentAccessorImpl) DNSPolicy() corev1.DNSPolicy {
dnsPolicy := corev1.DNSClusterFirst // same as kubernetes default
if a.HostNetwork() {
dnsPolicy = corev1.DNSClusterFirstWithHostNet
}
return dnsPolicy
}
func (a *componentAccessorImpl) BuildPodSpec() corev1.PodSpec {
spec := corev1.PodSpec{
SchedulerName: a.SchedulerName(),
Affinity: a.Affinity(),
NodeSelector: a.NodeSelector(),
HostNetwork: a.HostNetwork(),
RestartPolicy: corev1.RestartPolicyAlways,
Tolerations: a.Tolerations(),
}
if a.PriorityClassName() != nil {
spec.PriorityClassName = *a.PriorityClassName()
}
if a.ImagePullSecrets() != nil {
spec.ImagePullSecrets = a.ImagePullSecrets()
}
if a.TerminationGracePeriodSeconds() != nil {
spec.TerminationGracePeriodSeconds = a.TerminationGracePeriodSeconds()
}
return spec
}
func (a *componentAccessorImpl) Env() []corev1.EnvVar {
return a.ComponentSpec.Env
}
func (a *componentAccessorImpl) AdditionalContainers() []corev1.Container {
return a.ComponentSpec.AdditionalContainers
}
func (a *componentAccessorImpl) AdditionalVolumes() []corev1.Volume {
return a.ComponentSpec.AdditionalVolumes
}
func (a *componentAccessorImpl) TerminationGracePeriodSeconds() *int64 {
return a.ComponentSpec.TerminationGracePeriodSeconds
}
func buildSeaweedComponentAccessor(spec *SeaweedSpec, componentSpec *ComponentSpec) ComponentAccessor {
return &componentAccessorImpl{
imagePullPolicy: spec.ImagePullPolicy,
imagePullSecrets: spec.ImagePullSecrets,
hostNetwork: spec.HostNetwork,
affinity: spec.Affinity,
schedulerName: spec.SchedulerName,
clusterNodeSelector: spec.NodeSelector,
clusterAnnotations: spec.Annotations,
tolerations: spec.Tolerations,
statefulSetUpdateStrategy: spec.StatefulSetUpdateStrategy,
ComponentSpec: componentSpec,
}
}
// BaseMasterSpec provides merged spec of masters
func (s *Seaweed) BaseMasterSpec() ComponentAccessor {
return buildSeaweedComponentAccessor(&s.Spec, &s.Spec.Master.ComponentSpec)
}
// BaseFilerSpec provides merged spec of filers
func (s *Seaweed) BaseFilerSpec() ComponentAccessor {
return buildSeaweedComponentAccessor(&s.Spec, &s.Spec.Filer.ComponentSpec)
}
// BaseVolumeSpec provides merged spec of volumes
func (s *Seaweed) BaseVolumeSpec() ComponentAccessor {
return buildSeaweedComponentAccessor(&s.Spec, &s.Spec.Volume.ComponentSpec)
}

View File

@ -25,6 +25,20 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// Constants
const (
GRPCPortDelta = 10000
MasterHTTPPort = 9333
VolumeHTTPPort = 8444
FilerHTTPPort = 8888
FilerS3Port = 8333
MasterGRPCPort = MasterHTTPPort + GRPCPortDelta
VolumeGRPCPort = VolumeHTTPPort + GRPCPortDelta
FilerGRPCPort = FilerHTTPPort + GRPCPortDelta
)
// SeaweedSpec defines the desired state of Seaweed
type SeaweedSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
@ -41,14 +55,51 @@ type SeaweedSpec struct {
// Master
Master *MasterSpec `json:"master,omitempty"`
// Volume
Volume *VolumeSpec `json:"volume,omitempty"`
// Filer
Filer *FilerSpec `json:"filer,omitempty"`
// SchedulerName of pods
SchedulerName string `json:"schedulerName,omitempty"`
// Persistent volume reclaim policy
PVReclaimPolicy *corev1.PersistentVolumeReclaimPolicy `json:"pvReclaimPolicy,omitempty"`
// ImagePullPolicy of pods
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
// ImagePullSecrets is an optional list of references to secrets in the same namespace to use for pulling any of the images.
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
// Whether enable PVC reclaim for orphan PVC left by statefulset scale-in
EnablePVReclaim *bool `json:"enablePVReclaim,omitempty"`
// Whether Hostnetwork is enabled for pods
HostNetwork *bool `json:"hostNetwork,omitempty"`
// Affinity of pods
Affinity *corev1.Affinity `json:"affinity,omitempty"`
// Base node selectors of Pods, components may add or override selectors upon this respectively
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// Base annotations of Pods, components may add or override selectors upon this respectively
Annotations map[string]string `json:"annotations,omitempty"`
// Base tolerations of Pods, components may add more tolerations upon this respectively
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
// StatefulSetUpdateStrategy indicates the StatefulSetUpdateStrategy that will be
// employed to update Pods in the StatefulSet when a revision is made to
// Template.
StatefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType `json:"statefulSetUpdateStrategy,omitempty"`
VolumeServerDiskCount int32 `json:"volumeServerDiskCount,omitempty"`
// ingress
// Ingresses
Hosts []string `json:"hosts"`
}
@ -65,7 +116,8 @@ type MasterSpec struct {
// The desired ready replicas
// +kubebuilder:validation:Minimum=1
Replicas int32 `json:"replicas"`
Replicas int32 `json:"replicas"`
Service *ServiceSpec `json:"service,omitempty"`
}
// VolumeSpec is the spec for volume servers
@ -75,7 +127,8 @@ type VolumeSpec struct {
// The desired ready replicas
// +kubebuilder:validation:Minimum=1
Replicas int32 `json:"replicas"`
Replicas int32 `json:"replicas"`
Service *ServiceSpec `json:"service,omitempty"`
}
// FilerSpec is the spec for filers
@ -85,7 +138,8 @@ type FilerSpec struct {
// The desired ready replicas
// +kubebuilder:validation:Minimum=1
Replicas int32 `json:"replicas"`
Replicas int32 `json:"replicas"`
Service *ServiceSpec `json:"service,omitempty"`
}
// ComponentSpec is the base spec of each component, the fields should always accessed by the Basic<Component>Spec() method to respect the cluster-level properties
@ -122,16 +176,9 @@ type ComponentSpec struct {
// List of environment variables to set in the container, like
// v1.Container.Env.
// Note that following env names cannot be used and may be overrided by
// tidb-operator built envs.
// Note that following env names cannot be used and may be overrided by operators
// - NAMESPACE
// - TZ
// - SERVICE_NAME
// - PEER_SERVICE_NAME
// - HEADLESS_SERVICE_NAME
// - SET_NAME
// - HOSTNAME
// - CLUSTER_NAME
// - POD_IP
// - POD_NAME
Env []corev1.EnvVar `json:"env,omitempty"`
@ -157,6 +204,20 @@ type ComponentSpec struct {
StatefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType `json:"statefulSetUpdateStrategy,omitempty"`
}
type ServiceSpec struct {
// Type of the real kubernetes service
Type corev1.ServiceType `json:"type,omitempty"`
// Additional annotations of the kubernetes service object
Annotations map[string]string `json:"annotations,omitempty"`
// LoadBalancerIP is the loadBalancerIP of service
LoadBalancerIP *string `json:"loadBalancerIP,omitempty"`
// ClusterIP is the clusterIP of service
ClusterIP *string `json:"clusterIP,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

View File

@ -127,6 +127,11 @@ func (in *FilerSpec) DeepCopyInto(out *FilerSpec) {
*out = *in
in.ComponentSpec.DeepCopyInto(&out.ComponentSpec)
in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements)
if in.Service != nil {
in, out := &in.Service, &out.Service
*out = new(ServiceSpec)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FilerSpec.
@ -144,6 +149,11 @@ func (in *MasterSpec) DeepCopyInto(out *MasterSpec) {
*out = *in
in.ComponentSpec.DeepCopyInto(&out.ComponentSpec)
in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements)
if in.Service != nil {
in, out := &in.Service, &out.Service
*out = new(ServiceSpec)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterSpec.
@ -233,6 +243,52 @@ func (in *SeaweedSpec) DeepCopyInto(out *SeaweedSpec) {
*out = new(FilerSpec)
(*in).DeepCopyInto(*out)
}
if in.PVReclaimPolicy != nil {
in, out := &in.PVReclaimPolicy, &out.PVReclaimPolicy
*out = new(corev1.PersistentVolumeReclaimPolicy)
**out = **in
}
if in.ImagePullSecrets != nil {
in, out := &in.ImagePullSecrets, &out.ImagePullSecrets
*out = make([]corev1.LocalObjectReference, len(*in))
copy(*out, *in)
}
if in.EnablePVReclaim != nil {
in, out := &in.EnablePVReclaim, &out.EnablePVReclaim
*out = new(bool)
**out = **in
}
if in.HostNetwork != nil {
in, out := &in.HostNetwork, &out.HostNetwork
*out = new(bool)
**out = **in
}
if in.Affinity != nil {
in, out := &in.Affinity, &out.Affinity
*out = new(corev1.Affinity)
(*in).DeepCopyInto(*out)
}
if in.NodeSelector != nil {
in, out := &in.NodeSelector, &out.NodeSelector
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Annotations != nil {
in, out := &in.Annotations, &out.Annotations
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Tolerations != nil {
in, out := &in.Tolerations, &out.Tolerations
*out = make([]corev1.Toleration, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Hosts != nil {
in, out := &in.Hosts, &out.Hosts
*out = make([]string, len(*in))
@ -265,11 +321,48 @@ func (in *SeaweedStatus) DeepCopy() *SeaweedStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
*out = *in
if in.Annotations != nil {
in, out := &in.Annotations, &out.Annotations
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.LoadBalancerIP != nil {
in, out := &in.LoadBalancerIP, &out.LoadBalancerIP
*out = new(string)
**out = **in
}
if in.ClusterIP != nil {
in, out := &in.ClusterIP, &out.ClusterIP
*out = new(string)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServiceSpec.
func (in *ServiceSpec) DeepCopy() *ServiceSpec {
if in == nil {
return nil
}
out := new(ServiceSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) {
*out = *in
in.ComponentSpec.DeepCopyInto(&out.ComponentSpec)
in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements)
if in.Service != nil {
in, out := &in.Service, &out.Service
*out = new(ServiceSpec)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeSpec.

View File

@ -2,6 +2,7 @@ package controllers
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
appsv1 "k8s.io/api/apps/v1"
@ -18,7 +19,7 @@ func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (do
return
}
if done, result, err = r.ensureFilerNodePortService(seaweedCR); done {
if done, result, err = r.ensureFilerService(seaweedCR); done {
return
}
@ -57,11 +58,11 @@ func (r *SeaweedReconciler) ensureFilerHeadlessService(seaweedCR *seaweedv1.Seaw
return ReconcileResult(err)
}
func (r *SeaweedReconciler) ensureFilerNodePortService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
filerService := r.createFilerNodePortService(seaweedCR)
filerService := r.createFilerService(seaweedCR)
_, err := r.CreateOrUpdateService(filerService)
log.Info("ensure filer service " + filerService.Name)

View File

@ -27,20 +27,20 @@ func (r *SeaweedReconciler) createFilerHeadlessService(m *seaweedv1.Seaweed) *co
{
Name: "swfs-filer",
Protocol: corev1.Protocol("TCP"),
Port: 8888,
TargetPort: intstr.FromInt(8888),
Port: seaweedv1.FilerHTTPPort,
TargetPort: intstr.FromInt(seaweedv1.FilerHTTPPort),
},
{
Name: "swfs-filer-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18888,
TargetPort: intstr.FromInt(18888),
Port: seaweedv1.FilerGRPCPort,
TargetPort: intstr.FromInt(seaweedv1.FilerGRPCPort),
},
{
Name: "swfs-s3",
Protocol: corev1.Protocol("TCP"),
Port: 8333,
TargetPort: intstr.FromInt(8333),
Port: seaweedv1.FilerS3Port,
TargetPort: intstr.FromInt(seaweedv1.FilerS3Port),
},
},
Selector: labels,
@ -49,7 +49,7 @@ func (r *SeaweedReconciler) createFilerHeadlessService(m *seaweedv1.Seaweed) *co
return dep
}
func (r *SeaweedReconciler) createFilerNodePortService(m *seaweedv1.Seaweed) *corev1.Service {
func (r *SeaweedReconciler) createFilerService(m *seaweedv1.Seaweed) *corev1.Service {
labels := labelsForFiler(m.Name)
dep := &corev1.Service{
@ -62,33 +62,47 @@ func (r *SeaweedReconciler) createFilerNodePortService(m *seaweedv1.Seaweed) *co
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Type: corev1.ServiceTypeClusterIP,
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{
Name: "swfs-filer",
Protocol: corev1.Protocol("TCP"),
Port: 8888,
TargetPort: intstr.FromInt(8888),
NodePort: 30888,
Port: seaweedv1.FilerHTTPPort,
TargetPort: intstr.FromInt(seaweedv1.FilerHTTPPort),
},
{
Name: "swfs-filer-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18888,
TargetPort: intstr.FromInt(18888),
NodePort: 31888,
Port: seaweedv1.FilerGRPCPort,
TargetPort: intstr.FromInt(seaweedv1.FilerGRPCPort),
},
{
Name: "swfs-s3",
Protocol: corev1.Protocol("TCP"),
Port: 8333,
TargetPort: intstr.FromInt(8333),
NodePort: 30833,
Port: seaweedv1.FilerS3Port,
TargetPort: intstr.FromInt(seaweedv1.FilerS3Port),
},
},
Selector: labels,
},
}
if m.Spec.Filer.Service != nil {
svcSpec := m.Spec.Filer.Service
dep.Annotations = copyAnnotations(svcSpec.Annotations)
if svcSpec.Type != "" {
dep.Spec.Type = svcSpec.Type
}
if svcSpec.ClusterIP != nil {
dep.Spec.ClusterIP = *svcSpec.ClusterIP
}
if svcSpec.LoadBalancerIP != nil {
dep.Spec.LoadBalancerIP = *svcSpec.LoadBalancerIP
}
}
return dep
}

View File

@ -17,6 +17,89 @@ func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1
rollingUpdatePartition := int32(0)
enableServiceLinks := false
filerPodSpec := m.BaseFilerSpec().BuildPodSpec()
filerPodSpec.EnableServiceLinks = &enableServiceLinks
filerPodSpec.Containers = []corev1.Container{{
Name: "seaweedfs",
Image: m.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed filer -port=8888 %s %s -s3",
fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name),
fmt.Sprintf("-master=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: seaweedv1.FilerHTTPPort,
Name: "swfs-filer",
},
{
ContainerPort: seaweedv1.FilerGRPCPort,
},
{
ContainerPort: seaweedv1.FilerS3Port,
Name: "swfs-s3",
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt(seaweedv1.FilerHTTPPort),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 10,
TimeoutSeconds: 3,
PeriodSeconds: 15,
SuccessThreshold: 1,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt(seaweedv1.FilerHTTPPort),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 3,
PeriodSeconds: 30,
SuccessThreshold: 1,
FailureThreshold: 6,
},
}}
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-filer",
@ -39,89 +122,7 @@ func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: m.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed filer -port=8888 %s %s -s3",
fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name),
fmt.Sprintf("-master=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8888,
Name: "swfs-filer",
},
{
ContainerPort: 18888,
},
{
ContainerPort: 8333,
Name: "swfs-s3",
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt(8888),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 10,
TimeoutSeconds: 3,
PeriodSeconds: 15,
SuccessThreshold: 1,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt(8888),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 3,
PeriodSeconds: 30,
SuccessThreshold: 1,
FailureThreshold: 6,
},
}},
},
Spec: filerPodSpec,
},
},
}

View File

@ -12,10 +12,6 @@ import (
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
)
const (
MasterClusterSize = 3
)
func (r *SeaweedReconciler) ensureMaster(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) {
_ = context.Background()
_ = r.Log.WithValues("seaweed", seaweedCR.Name)

View File

@ -27,14 +27,14 @@ func (r *SeaweedReconciler) createMasterService(m *seaweedv1.Seaweed) *corev1.Se
{
Name: "swfs-master",
Protocol: corev1.Protocol("TCP"),
Port: 9333,
TargetPort: intstr.FromInt(9333),
Port: seaweedv1.MasterHTTPPort,
TargetPort: intstr.FromInt(seaweedv1.MasterHTTPPort),
},
{
Name: "swfs-master-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 19333,
TargetPort: intstr.FromInt(19333),
Port: seaweedv1.MasterGRPCPort,
TargetPort: intstr.FromInt(seaweedv1.MasterGRPCPort),
},
},
Selector: labels,

View File

@ -17,6 +17,60 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv
rollingUpdatePartition := int32(0)
enableServiceLinks := false
masterPodSpec := m.BaseMasterSpec().BuildPodSpec()
masterPodSpec.EnableServiceLinks = &enableServiceLinks
masterPodSpec.Containers = []corev1.Container{{
Name: "seaweedfs",
Image: m.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: append(m.BaseMasterSpec().Env(), kubernetesEnvVars...),
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name),
fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: seaweedv1.MasterHTTPPort,
Name: "swfs-master",
},
{
ContainerPort: seaweedv1.MasterGRPCPort,
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.FromInt(seaweedv1.MasterHTTPPort),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 15,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.FromInt(seaweedv1.MasterHTTPPort),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 15,
TimeoutSeconds: 15,
PeriodSeconds: 15,
SuccessThreshold: 1,
FailureThreshold: 6,
},
}}
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-master",
@ -39,86 +93,7 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Affinity: m.Spec.Master.Affinity,
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: m.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name),
fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 9333,
Name: "swfs-master",
},
{
ContainerPort: 19333,
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.FromInt(9333),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 15,
PeriodSeconds: 15,
SuccessThreshold: 2,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/cluster/status",
Port: intstr.FromInt(9333),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 15,
TimeoutSeconds: 15,
PeriodSeconds: 15,
SuccessThreshold: 1,
FailureThreshold: 6,
},
}},
},
Spec: masterPodSpec,
},
},
}

View File

@ -27,14 +27,14 @@ func (r *SeaweedReconciler) createVolumeServerService(m *seaweedv1.Seaweed) *cor
{
Name: "swfs-volume",
Protocol: corev1.Protocol("TCP"),
Port: 8444,
TargetPort: intstr.FromInt(8444),
Port: seaweedv1.VolumeHTTPPort,
TargetPort: intstr.FromInt(seaweedv1.VolumeHTTPPort),
},
{
Name: "swfs-volume-grpc",
Protocol: corev1.Protocol("TCP"),
Port: 18444,
TargetPort: intstr.FromInt(18444),
Port: seaweedv1.VolumeGRPCPort,
TargetPort: intstr.FromInt(seaweedv1.VolumeGRPCPort),
},
},
Selector: labels,

View File

@ -59,6 +59,63 @@ func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed)
dirs = append(dirs, fmt.Sprintf("/data%d", i))
}
volumePodSpec := m.BaseVolumeSpec().BuildPodSpec()
volumePodSpec.EnableServiceLinks = &enableServiceLinks
volumePodSpec.Containers = []corev1.Container{{
Name: "seaweedfs",
Image: m.Spec.Image,
ImagePullPolicy: m.BaseVolumeSpec().ImagePullPolicy(),
Env: append(m.BaseVolumeSpec().Env(), kubernetesEnvVars...),
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed volume -port=8444 -max=0 %s %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name),
fmt.Sprintf("-dir=%s", strings.Join(dirs, ",")),
fmt.Sprintf("-mserver=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8444,
Name: "swfs-volume",
},
{
ContainerPort: 18444,
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/status",
Port: intstr.FromInt(8444),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 15,
TimeoutSeconds: 5,
PeriodSeconds: 90,
SuccessThreshold: 1,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/status",
Port: intstr.FromInt(8444),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 5,
PeriodSeconds: 90,
SuccessThreshold: 1,
FailureThreshold: 6,
},
VolumeMounts: volumeMounts,
}}
volumePodSpec.Volumes = volumes
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-volume",
@ -81,89 +138,7 @@ func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed)
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
EnableServiceLinks: &enableServiceLinks,
Containers: []corev1.Container{{
Name: "seaweedfs",
Image: m.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
Command: []string{
"/bin/sh",
"-ec",
fmt.Sprintf("weed volume -port=8444 -max=0 %s %s %s",
fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name),
fmt.Sprintf("-dir=%s", strings.Join(dirs, ",")),
fmt.Sprintf("-mserver=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
),
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8444,
Name: "swfs-volume",
},
{
ContainerPort: 18444,
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/status",
Port: intstr.FromInt(8444),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 15,
TimeoutSeconds: 5,
PeriodSeconds: 90,
SuccessThreshold: 1,
FailureThreshold: 100,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/status",
Port: intstr.FromInt(8444),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: 20,
TimeoutSeconds: 5,
PeriodSeconds: 90,
SuccessThreshold: 1,
FailureThreshold: 6,
},
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
},
Spec: volumePodSpec,
},
VolumeClaimTemplates: persistentVolumeClaims,
},

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)
@ -11,6 +12,35 @@ const (
masterPeerAddressPattern = "%s-master-%d.%s-master:9333"
)
var (
kubernetesEnvVars = []corev1.EnvVar{
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
}
)
func ReconcileResult(err error) (bool, ctrl.Result, error) {
if err != nil {
return true, ctrl.Result{}, err
@ -29,3 +59,14 @@ func getMasterAddresses(name string, replicas int32) []string {
func getMasterPeersString(name string, replicas int32) string {
return strings.Join(getMasterAddresses(name, replicas), ",")
}
func copyAnnotations(src map[string]string) map[string]string {
if src == nil {
return nil
}
dst := map[string]string{}
for k, v := range src {
dst[k] = v
}
return dst
}