Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.

Commit 9f83889

Browse files
committed
Use ssh package to save backups
Signed-off-by: JoshVanL <[email protected]>
1 parent ec962f2 commit 9f83889

File tree

13 files changed

+156
-119
lines changed

13 files changed

+156
-119
lines changed

cmd/tarmak/cmd/cluster_snapshot_etcd_restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var clusterSnapshotEtcdRestoreCmd = &cobra.Command{
1515
Short: "restore etcd cluster with source snapshot",
1616
PreRunE: func(cmd *cobra.Command, args []string) error {
1717
if len(args) != 1 {
18-
return fmt.Errorf("expecting single source path, got=%d", len(args))
18+
return fmt.Errorf("expecting single target path, got=%d", len(args))
1919
}
2020

2121
return nil

cmd/tarmak/cmd/cluster_snapshot_etcd_save.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
package cmd
33

44
import (
5-
"fmt"
6-
75
"github.com/spf13/cobra"
86

97
"github.com/jetstack/tarmak/pkg/tarmak"
@@ -13,14 +11,10 @@ import (
1311
var clusterSnapshotEtcdSaveCmd = &cobra.Command{
1412
Use: "save [target path prefix]",
1513
Short: "save etcd snapshot to target path prefix, i.e 'backup-'",
16-
PreRunE: func(cmd *cobra.Command, args []string) error {
17-
if len(args) != 1 {
18-
return fmt.Errorf("expecting single target path, got=%d", len(args))
19-
}
20-
21-
return nil
22-
},
2314
Run: func(cmd *cobra.Command, args []string) {
15+
if len(args) == 0 {
16+
args = []string{""}
17+
}
2418
t := tarmak.New(globalFlags)
2519
s := etcd.New(t, args[0])
2620
t.CancellationContext().WaitOrCancel(t.NewCmdSnapshot(cmd.Flags(), args, s).Save)

cmd/tarmak/cmd/cluster_ssh.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cmd
33

44
import (
55
"errors"
6+
"strings"
67

78
"github.com/spf13/cobra"
89

@@ -20,7 +21,7 @@ var clusterSshCmd = &cobra.Command{
2021
},
2122
Run: func(cmd *cobra.Command, args []string) {
2223
t := tarmak.New(globalFlags)
23-
t.Perform(t.SSHPassThrough(args[0], args[1:]))
24+
t.Perform(t.SSHPassThrough(args[0], strings.Join(args[1:], " ")))
2425
},
2526
}
2627

pkg/tarmak/environment/bastion.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (e *Environment) VerifyBastionAvailable() error {
6161
executeSSH := func() error {
6262
retCode, err := ssh.Execute(
6363
"bastion",
64-
[]string{"/bin/true"},
64+
"/bin/true",
6565
nil, nil, stderrW,
6666
)
6767

pkg/tarmak/interfaces/interfaces.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@ type Terraform interface {
208208

209209
type SSH interface {
210210
WriteConfig(Cluster) error
211-
PassThrough(host string, additionalArguments []string) error
211+
PassThrough(host string, additionalArguments string) error
212212
Tunnel(destination, destinationPort, localPort string, daemonize bool) Tunnel
213-
Execute(host string, cmd []string, stdin io.Reader, stdout, stderr io.Writer) (returnCode int, err error)
213+
Execute(host string, cmd string, stdin io.Reader, stdout, stderr io.Writer) (returnCode int, err error)
214214
Validate() error
215215
Cleanup()
216216
}
@@ -296,4 +296,6 @@ type CancellationContext interface {
296296
type Snapshot interface {
297297
Save() error
298298
Restore() error
299+
Log() *logrus.Entry
300+
SSH() SSH
299301
}

pkg/tarmak/snapshot/consul/consul.go

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package consul
33

44
import (
5-
"bufio"
65
"fmt"
76
"io"
87
"os"
@@ -55,25 +54,26 @@ func (c *Consul) Save() error {
5554

5655
c.log.Infof("saving snapshot from instance %s", aliases[0])
5756

58-
var result *multierror.Error
59-
var errLock sync.Mutex
60-
61-
reader, writer := io.Pipe()
62-
go snapshot.ReadTarFromStream(c.path, reader, result, errLock)
63-
6457
hostPath := fmt.Sprintf("/tmp/consul-snapshot-%s.snap",
6558
time.Now().Format(snapshot.TimeLayout))
66-
cmdArgs := append(envCmd,
67-
strings.Split(fmt.Sprintf(consulCmd, "save", hostPath), " ")...)
68-
cmdArgs = append(cmdArgs,
69-
strings.Split(fmt.Sprintf(snapshot.GZipCCmd, hostPath), " ")...)
70-
71-
err = c.sshCmd(
72-
aliases[0],
73-
cmdArgs,
74-
nil,
75-
writer,
76-
)
59+
cmdArgs := fmt.Sprintf(`
60+
export CONSUL_HTTP_TOKEN=$(sudo cat /etc/consul/consul.json | jq -r '.acl_master_token');
61+
export DATACENTER=$(sudo cat /etc/consul/consul.json | jq -r '.datacenter');
62+
/usr/local/bin/consul snapshot save -datacenter $DATACENTER %s;
63+
/usr/local/bin/consul snapshot inspect %s`, hostPath, hostPath)
64+
65+
err = snapshot.SSHCmd(c, aliases[0], cmdArgs, nil, nil, nil)
66+
if err != nil {
67+
return err
68+
}
69+
70+
reader, writer := io.Pipe()
71+
err = snapshot.TarFromStream(func() error {
72+
err := snapshot.SSHCmd(c, aliases[0], fmt.Sprintf(snapshot.GZipCCmd, hostPath),
73+
nil, writer, nil)
74+
writer.Close()
75+
return err
76+
}, reader, c.path)
7777
if err != nil {
7878
return err
7979
}
@@ -107,11 +107,11 @@ func (c *Consul) Restore() error {
107107
reader, writer := io.Pipe()
108108
go snapshot.WriteTarToStream(c.path, writer, result, errLock)
109109

110-
err = c.sshCmd(
111-
a,
112-
cmdArgs,
110+
err = snapshot.SSHCmd(c, a,
111+
strings.Join(cmdArgs, " "),
113112
reader,
114113
os.Stdout,
114+
nil,
115115
)
116116
if err != nil {
117117
return err
@@ -123,20 +123,10 @@ func (c *Consul) Restore() error {
123123
return nil
124124
}
125125

126-
func (c *Consul) sshCmd(host string, args []string, stdin io.Reader, stdout io.Writer) error {
127-
readerE, writerE := io.Pipe()
128-
scannerE := bufio.NewScanner(readerE)
129-
130-
go func() {
131-
for scannerE.Scan() {
132-
c.log.WithField("std", "err").Warn(scannerE.Text())
133-
}
134-
}()
135-
136-
ret, err := c.ssh.ExecuteWithPipe(host, args[0], args[1:], stdin, stdout, writerE)
137-
if ret != 0 {
138-
return fmt.Errorf("command [%s] returned non-zero: %d", strings.Join(args, " "), ret)
139-
}
126+
func (c *Consul) Log() *logrus.Entry {
127+
return c.log
128+
}
140129

141-
return err
130+
func (c *Consul) SSH() interfaces.SSH {
131+
return c.ssh
142132
}

pkg/tarmak/snapshot/etcd/etcd.go

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
package etcd
33

44
import (
5-
"bufio"
5+
//"bufio"
66
"fmt"
77
"io"
88
"strings"
@@ -20,7 +20,7 @@ import (
2020
var _ interfaces.Snapshot = &Etcd{}
2121

2222
const (
23-
etcdctlCmd = "/opt/bin/etcdctl snapshot %s %s > /dev/null;"
23+
etcdctlCmd = `/opt/bin/etcdctl snapshot %s %s`
2424
)
2525

2626
var (
@@ -30,13 +30,13 @@ var (
3030
{"store": "overlay", "file": "overlay", "port": "2359"},
3131
}
3232

33-
envCmd = []string{
34-
"ETCDCTL_CERT=/etc/etcd/ssl/etcd-{{file}}.pem",
35-
"ETCDCTL_KEY=/etc/etcd/ssl/etcd-{{file}}-key.pem",
36-
"ETCDCTL_CACERT=/etc/etcd/ssl/etcd-{{file}}-ca.pem",
37-
"ETCDCTL_API=3",
38-
"ETCDCTL_ENDPOINTS=https://127.0.0.1:{{port}}",
39-
}
33+
envCmd = `
34+
export ETCDCTL_CERT=/etc/etcd/ssl/etcd-{{file}}.pem;
35+
export ETCDCTL_KEY=/etc/etcd/ssl/etcd-{{file}}-key.pem;
36+
export ETCDCTL_CACERT=/etc/etcd/ssl/etcd-{{file}}-ca.pem;
37+
export ETCDCTL_API=3;
38+
export ETCDCTL_ENDPOINTS=https://127.0.0.1:{{port}}
39+
`
4040
)
4141

4242
type Etcd struct {
@@ -75,23 +75,12 @@ func (e *Etcd) Save() error {
7575
saveFunc := func(store map[string]string) {
7676
defer wg.Done()
7777

78-
targetPath := fmt.Sprintf("%s%s.db", e.path, store["store"])
79-
80-
reader, writer := io.Pipe()
81-
go snapshot.ReadTarFromStream(targetPath, reader, result, errLock)
82-
8378
hostPath := fmt.Sprintf("/tmp/etcd-snapshot-%s-%s.db",
8479
store["store"], time.Now().Format(snapshot.TimeLayout))
85-
cmdArgs := append(e.template(envCmd, store),
86-
strings.Split(fmt.Sprintf(etcdctlCmd, "save", hostPath), " ")...)
87-
cmdArgs = append(cmdArgs,
88-
strings.Split(fmt.Sprintf(snapshot.GZipCCmd, hostPath), " ")...)
89-
90-
err = e.sshCmd(
91-
aliases[0],
92-
cmdArgs,
93-
writer,
94-
)
80+
81+
cmdArgs := fmt.Sprintf(`sudo /bin/bash -c "%s; %s"`, e.template(envCmd, store),
82+
fmt.Sprintf(etcdctlCmd, "save", hostPath))
83+
err = snapshot.SSHCmd(e, aliases[0], cmdArgs, nil, nil, nil)
9584
if err != nil {
9685

9786
errLock.Lock()
@@ -101,6 +90,22 @@ func (e *Etcd) Save() error {
10190
return
10291
}
10392

93+
targetPath := fmt.Sprintf("%s%s.db", e.path, store["store"])
94+
reader, writer := io.Pipe()
95+
err = snapshot.TarFromStream(func() error {
96+
err := snapshot.SSHCmd(e, aliases[0], fmt.Sprintf(snapshot.GZipCCmd, hostPath),
97+
nil, writer, nil)
98+
writer.Close()
99+
return err
100+
}, reader, targetPath)
101+
if err != nil {
102+
errLock.Lock()
103+
result = multierror.Append(result, err)
104+
errLock.Unlock()
105+
106+
return
107+
}
108+
104109
e.log.Infof("etcd %s snapshot saved to %s", store["store"], targetPath)
105110

106111
select {
@@ -111,7 +116,6 @@ func (e *Etcd) Save() error {
111116
}
112117

113118
wg.Add(len(stores))
114-
115119
for _, store := range stores {
116120
go saveFunc(store)
117121
}
@@ -137,31 +141,18 @@ func (e *Etcd) Restore() error {
137141
return nil
138142
}
139143

140-
func (e *Etcd) sshCmd(host string, args []string, stdout io.Writer) error {
141-
readerE, writerE := io.Pipe()
142-
scannerE := bufio.NewScanner(readerE)
143-
144-
go func() {
145-
for scannerE.Scan() {
146-
e.log.WithField("std", "err").Warn(scannerE.Text())
147-
}
148-
}()
149-
150-
args = append([]string{"sudo"}, args...)
151-
ret, err := e.ssh.ExecuteWithPipe(host, args[0], args[1:], nil, stdout, writerE)
152-
if ret != 0 {
153-
return fmt.Errorf("command [%s] returned non-zero: %d", strings.Join(args, " "), ret)
144+
func (e *Etcd) template(args string, vars map[string]string) string {
145+
for k, v := range vars {
146+
args = strings.Replace(args, fmt.Sprintf("{{%s}}", k), v, -1)
154147
}
155148

156-
return err
149+
return args
157150
}
158151

159-
func (e *Etcd) template(args []string, vars map[string]string) []string {
160-
for i := range args {
161-
for k, v := range vars {
162-
args[i] = strings.Replace(args[i], fmt.Sprintf("{{%s}}", k), v, -1)
163-
}
164-
}
152+
func (e *Etcd) Log() *logrus.Entry {
153+
return e.log
154+
}
165155

166-
return args
156+
func (e *Etcd) SSH() interfaces.SSH {
157+
return e.ssh
167158
}

0 commit comments

Comments
 (0)