Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.

Commit d612a62

Browse files
committed
Attempt to tar both received and sent snapshots
Signed-off-by: JoshVanL <[email protected]>
1 parent 35f7c0c commit d612a62

File tree

5 files changed

+131
-114
lines changed

5 files changed

+131
-114
lines changed

pkg/tarmak/interfaces/interfaces.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,10 @@ type SSH interface {
210210
WriteConfig(Cluster) error
211211
PassThrough([]string)
212212
Tunnel(hostname string, destination string, destinationPort int) Tunnel
213-
Execute(host string, cmd string, args []string) (returnCode int, err error)
214213
Validate() error
215214
Cleanup() error
216-
ExecuteWithWriter(host string, cmd string, args []string, stdout, stderr io.Writer) (returnCode int, err error)
217-
ScpToLocal(path, targetPath, localPath string) (retCode int, err error)
218-
ScpToHost(path, localPath, targetPath string) (retCode int, err error)
215+
Execute(host, cmd string, args []string) (returnCode int, err error)
216+
ExecuteWithPipe(host, cmd string, args []string, stdin io.Reader, stdout, stderr io.Writer) (returnCode int, err error)
219217
}
220218

221219
type Tunnel interface {

pkg/tarmak/snapshot/consul/consul.go

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"bufio"
66
"fmt"
77
"io"
8+
"os"
89
"strings"
10+
"sync"
911
"time"
1012

13+
"github.com/hashicorp/go-multierror"
1114
"github.com/sirupsen/logrus"
1215

1316
clusterv1alpha1 "github.com/jetstack/tarmak/pkg/apis/cluster/v1alpha1"
@@ -18,14 +21,11 @@ import (
1821
var _ interfaces.Snapshot = &Consul{}
1922

2023
const (
21-
snapshotTimeLayout = "2006-01-02_15-04-05"
24+
consulCmd = "consul snapshot %s %s > /dev/null;"
2225
)
2326

2427
var (
25-
exportCmd = []string{
26-
"export",
27-
"CONSUL_HTTP_TOKEN=$(sudo cat /etc/consul/consul.json | jq -r '.acl_master_token');",
28-
}
28+
envCmd = []string{"CONSUL_HTTP_TOKEN=$(sudo cat /etc/consul/consul.json | jq -r '.acl_master_token')"}
2929
)
3030

3131
type Consul struct {
@@ -54,27 +54,33 @@ func (c *Consul) Save() error {
5454
c.aliases = aliases
5555

5656
c.log.Infof("saving snapshot from instance %s", aliases[0])
57-
targetPath := fmt.Sprintf("/tmp/consul-snapshot-%s.snap", time.Now().Format(snapshotTimeLayout))
5857

59-
cmdArgs := append(exportCmd, "consul", "snapshot", "save", targetPath)
58+
var result *multierror.Error
59+
var errLock sync.Mutex
60+
61+
reader, writer := io.Pipe()
62+
go snapshot.ReadTarFromStream(c.path, reader, result, errLock)
63+
64+
hostPath := fmt.Sprintf("/tmp/consul-snapshot-%s.snap",
65+
time.Now().Format(snapshot.TimeLayout))
66+
cmdArgs := append(envCmd,
67+
strings.Split(fmt.Sprintf(consulCmd, "save", hostPath), " ")...)
68+
cmdArgs = append(cmdArgs,
69+
strings.Split(fmt.Sprintf(snapshot.TarCCmd, hostPath), " ")...)
70+
6071
err = c.sshCmd(
6172
aliases[0],
62-
cmdArgs[0],
63-
cmdArgs[1:],
73+
cmdArgs,
74+
nil,
75+
writer,
6476
)
6577
if err != nil {
6678
return err
6779
}
6880

69-
ret, err := c.ssh.ScpToLocal(aliases[0], targetPath, c.path)
70-
if ret != 0 {
71-
cmdStr := fmt.Sprintf("%s", strings.Join(cmdArgs, " "))
72-
return fmt.Errorf("command [%s] returned non-zero: %d, %s", cmdStr, ret, err)
73-
}
74-
7581
c.log.Infof("consul snapshot saved to %s", c.path)
7682

77-
return err
83+
return nil
7884
}
7985

8086
func (c *Consul) Restore() error {
@@ -86,18 +92,27 @@ func (c *Consul) Restore() error {
8692

8793
for _, a := range aliases {
8894
c.log.Infof("restoring snapshot to instance %s", a)
89-
targetPath := fmt.Sprintf("/tmp/consul-snapshot-%s.snap", time.Now().Format(snapshotTimeLayout))
9095

91-
ret, err := c.ssh.ScpToHost(a, c.path, targetPath)
92-
if ret != 0 {
93-
return fmt.Errorf("command scp returned non-zero: %d, %s", ret, err)
94-
}
96+
hostPath := fmt.Sprintf("/tmp/consul-snapshot-%s.snap",
97+
time.Now().Format(snapshot.TimeLayout))
98+
99+
cmdArgs := strings.Split(fmt.Sprintf(snapshot.TarXCmd, hostPath), " ")
100+
//cmdArgs := strings.Split(snapshot.TarXCmd, " ")
101+
//cmdArgs = append(cmdArgs,
102+
// append(envCmd,
103+
// strings.Split(fmt.Sprintf(consulCmd, "restore", hostPath), " ")...)...)
104+
105+
var result *multierror.Error
106+
var errLock sync.Mutex
107+
108+
reader, writer := io.Pipe()
109+
go snapshot.WriteTarToStream(c.path, writer, result, errLock)
95110

96-
cmdArgs := append(exportCmd, "consul", "snapshot", "restore", targetPath)
97111
err = c.sshCmd(
98112
a,
99-
cmdArgs[0],
100-
cmdArgs[1:],
113+
cmdArgs,
114+
reader,
115+
os.Stdout,
101116
)
102117
if err != nil {
103118
return err
@@ -106,31 +121,22 @@ func (c *Consul) Restore() error {
106121

107122
c.log.Infof("consul snapshot restored from %s", c.path)
108123

109-
return err
124+
return nil
110125
}
111126

112-
func (c *Consul) sshCmd(host, command string, args []string) error {
113-
readerO, writerO := io.Pipe()
127+
func (c *Consul) sshCmd(host string, args []string, stdin io.Reader, stdout io.Writer) error {
114128
readerE, writerE := io.Pipe()
115-
scannerO := bufio.NewScanner(readerO)
116129
scannerE := bufio.NewScanner(readerE)
117130

118-
go func() {
119-
for scannerO.Scan() {
120-
c.log.WithField("std", "out").Debug(scannerO.Text())
121-
}
122-
}()
123-
124131
go func() {
125132
for scannerE.Scan() {
126133
c.log.WithField("std", "err").Warn(scannerE.Text())
127134
}
128135
}()
129136

130-
ret, err := c.ssh.ExecuteWithWriter(host, command, args, writerO, writerE)
137+
ret, err := c.ssh.ExecuteWithPipe(host, args[0], args[1:], stdin, stdout, writerE)
131138
if ret != 0 {
132-
cmdStr := fmt.Sprintf("%s %s", command, strings.Join(args, " "))
133-
return fmt.Errorf("command [%s] returned non-zero: %d", cmdStr, ret)
139+
return fmt.Errorf("command [%s] returned non-zero: %d", strings.Join(args, " "), ret)
134140
}
135141

136142
return err

pkg/tarmak/snapshot/etcd/etcd.go

Lines changed: 13 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@ package etcd
33

44
import (
55
"bufio"
6-
"compress/gzip"
76
"fmt"
87
"io"
9-
"os"
108
"strings"
119
"sync"
1210
"time"
1311

14-
//"github.com/docker/docker/pkg/archive"
1512
"github.com/hashicorp/go-multierror"
1613
"github.com/sirupsen/logrus"
1714

@@ -23,10 +20,7 @@ import (
2320
var _ interfaces.Snapshot = &Etcd{}
2421

2522
const (
26-
snapshotTimeLayout = "2006-01-02_15-04-05"
27-
2823
etcdctlCmd = "/opt/bin/etcdctl snapshot %s %s > /dev/null;"
29-
tarCmd = "tar -czPf - %s"
3024
)
3125

3226
var (
@@ -36,7 +30,7 @@ var (
3630
{"store": "overlay", "file": "overlay", "port": "2359"},
3731
}
3832

39-
exportCmd = []string{
33+
envCmd = []string{
4034
"ETCDCTL_CERT=/etc/etcd/ssl/etcd-{{file}}.pem",
4135
"ETCDCTL_KEY=/etc/etcd/ssl/etcd-{{file}}-key.pem",
4236
"ETCDCTL_CACERT=/etc/etcd/ssl/etcd-{{file}}-ca.pem",
@@ -53,7 +47,6 @@ type Etcd struct {
5347

5448
path string
5549
aliases []string
56-
errLock sync.Mutex // prevent multiple writes to result error
5750
}
5851

5952
func New(tarmak interfaces.Tarmak, path string) *Etcd {
@@ -77,21 +70,22 @@ func (e *Etcd) Save() error {
7770

7871
var wg sync.WaitGroup
7972
var result *multierror.Error
73+
var errLock sync.Mutex
8074

8175
saveFunc := func(store map[string]string) {
8276
defer wg.Done()
8377

84-
hostPath := fmt.Sprintf("/tmp/etcd-snapshot-%s-%s.db",
85-
store["store"], time.Now().Format(snapshotTimeLayout))
8678
targetPath := fmt.Sprintf("%s%s.db", e.path, store["store"])
8779

88-
cmdArgs := append(e.template(exportCmd, store),
80+
reader, writer := io.Pipe()
81+
go snapshot.ReadTarFromStream(targetPath, reader, result, errLock)
82+
83+
hostPath := fmt.Sprintf("/tmp/etcd-snapshot-%s-%s.db",
84+
store["store"], time.Now().Format(snapshot.TimeLayout))
85+
cmdArgs := append(e.template(envCmd, store),
8986
strings.Split(fmt.Sprintf(etcdctlCmd, "save", hostPath), " ")...)
9087
cmdArgs = append(cmdArgs,
91-
strings.Split(fmt.Sprintf(tarCmd, hostPath), " ")...)
92-
93-
reader, writer := io.Pipe()
94-
go e.readTarFromStream(targetPath, reader, result)
88+
strings.Split(fmt.Sprintf(snapshot.TarCCmd, hostPath), " ")...)
9589

9690
err = e.sshCmd(
9791
aliases[0],
@@ -100,9 +94,9 @@ func (e *Etcd) Save() error {
10094
)
10195
if err != nil {
10296

103-
e.errLock.Lock()
97+
errLock.Lock()
10498
result = multierror.Append(result, err)
105-
e.errLock.Unlock()
99+
errLock.Unlock()
106100

107101
return
108102
}
@@ -154,46 +148,14 @@ func (e *Etcd) sshCmd(host string, args []string, stdout io.Writer) error {
154148
}()
155149

156150
args = append([]string{"sudo"}, args...)
157-
ret, err := e.ssh.ExecuteWithWriter(host, args[0], args[1:], stdout, writerE)
151+
ret, err := e.ssh.ExecuteWithPipe(host, args[0], args[1:], nil, stdout, writerE)
158152
if ret != 0 {
159-
cmdStr := fmt.Sprintf("%s", strings.Join(args, " "))
160-
return fmt.Errorf("command [%s] returned non-zero: %d", cmdStr, ret)
153+
return fmt.Errorf("command [%s] returned non-zero: %d", strings.Join(args, " "), ret)
161154
}
162155

163156
return err
164157
}
165158

166-
func (e *Etcd) readTarFromStream(dest string, stream io.Reader, result *multierror.Error) {
167-
gzr, err := gzip.NewReader(stream)
168-
if err != nil {
169-
170-
e.errLock.Lock()
171-
result = multierror.Append(result, err)
172-
e.errLock.Unlock()
173-
174-
return
175-
}
176-
177-
f, err := os.Create(dest)
178-
if err != nil {
179-
180-
e.errLock.Lock()
181-
result = multierror.Append(result, err)
182-
e.errLock.Unlock()
183-
184-
return
185-
}
186-
187-
if _, err := io.Copy(f, gzr); err != nil {
188-
189-
e.errLock.Lock()
190-
result = multierror.Append(result, err)
191-
e.errLock.Unlock()
192-
193-
return
194-
}
195-
}
196-
197159
func (e *Etcd) template(args []string, vars map[string]string) []string {
198160
for i := range args {
199161
for k, v := range vars {

pkg/tarmak/snapshot/snapshot.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,25 @@
22
package snapshot
33

44
import (
5+
"compress/gzip"
56
"fmt"
7+
"io"
8+
"os"
9+
"sync"
610

711
"github.com/hashicorp/go-multierror"
812

913
"github.com/jetstack/tarmak/pkg/tarmak/interfaces"
1014
"github.com/jetstack/tarmak/pkg/tarmak/utils"
1115
)
1216

17+
const (
18+
TimeLayout = "2006-01-02_15-04-05"
19+
TarCCmd = "tar -czPf - %s"
20+
//TarXCmd = "cat | tar -xz | cat > /tmp/foo" //| cat > %s"
21+
TarXCmd = "cat > %s"
22+
)
23+
1324
func Prepare(tarmak interfaces.Tarmak, role string) (aliases []string, err error) {
1425
if err := tarmak.SSH().WriteConfig(tarmak.Cluster()); err != nil {
1526
return nil, err
@@ -47,3 +58,58 @@ func Prepare(tarmak interfaces.Tarmak, role string) (aliases []string, err error
4758

4859
return aliases, result.ErrorOrNil()
4960
}
61+
62+
func ReadTarFromStream(dest string, stream io.Reader, result *multierror.Error, errLock sync.Mutex) {
63+
gzr, err := gzip.NewReader(stream)
64+
if err != nil {
65+
66+
errLock.Lock()
67+
result = multierror.Append(result, err)
68+
errLock.Unlock()
69+
70+
return
71+
}
72+
73+
f, err := os.Create(dest)
74+
if err != nil {
75+
76+
errLock.Lock()
77+
result = multierror.Append(result, err)
78+
errLock.Unlock()
79+
80+
return
81+
}
82+
83+
if _, err := io.Copy(f, gzr); err != nil {
84+
85+
errLock.Lock()
86+
result = multierror.Append(result, err)
87+
errLock.Unlock()
88+
89+
return
90+
}
91+
}
92+
93+
func WriteTarToStream(src string, stream io.WriteCloser, result *multierror.Error, errLock sync.Mutex) {
94+
//defer stream.Close()
95+
96+
f, err := os.Open(src)
97+
if err != nil {
98+
99+
errLock.Lock()
100+
result = multierror.Append(result, err)
101+
errLock.Unlock()
102+
103+
return
104+
}
105+
106+
gzw := gzip.NewWriter(stream)
107+
if _, err := io.Copy(gzw, f); err != nil {
108+
109+
errLock.Lock()
110+
result = multierror.Append(result, err)
111+
errLock.Unlock()
112+
113+
return
114+
}
115+
}

0 commit comments

Comments
 (0)