From 93957a05a23b29e7a82b8fa2eca99f533d7a5e77 Mon Sep 17 00:00:00 2001 From: andrew Date: Mon, 6 May 2019 00:18:14 +0300 Subject: [PATCH 1/2] Add utube handling --- .gitignore | 1 + Gopkg.lock | 54 ++++++++++++++++++++++++++++++ Gopkg.toml | 34 +++++++++++++++++++ docker-compose.yaml | 11 +++++++ queue/config.lua | 45 +++++++++++++------------ queue/queue.go | 9 +++-- queue/queue_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 206 insertions(+), 28 deletions(-) create mode 100644 Gopkg.lock create mode 100644 Gopkg.toml create mode 100644 docker-compose.yaml diff --git a/.gitignore b/.gitignore index 8fcb790f1..1ceea7691 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .idea/ snap xlog +vendor/ diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 000000000..89ec8d790 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,54 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + digest = "1:318f1c959a8a740366fce4b1e1eb2fd914036b4af58fbd0a003349b305f118ad" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "UT" + revision = "b5d812f8a3706043e23a9cd5babf2e5423744d30" + version = "v1.3.1" + +[[projects]] + branch = "master" + digest = "1:76ee51c3f468493aff39dbacc401e8831fbb765104cbf613b89bef01cf4bad70" + name = "golang.org/x/net" + packages = ["context"] + pruneopts = "UT" + revision = "f4e77d36d62c17c2336347bb2670ddbd02d092b7" + +[[projects]] + digest = "1:f5b35141aaa32649e375fa72052f81c99808e2a58c77602948d82b06570a4f41" + name = "google.golang.org/appengine" + packages = [ + ".", + "datastore", + "internal", + "internal/app_identity", + "internal/base", + "internal/datastore", + "internal/log", + "internal/modules", + "internal/remote_api", + ] + pruneopts = "UT" + revision = "54a98f90d1c46b7731eb8fb305d2a321c30ef610" + version = "v1.5.0" + +[[projects]] + digest = "1:a7438a3f7f92a25e8fb1d67de9ff4fc3355b86249999bf131092b76823db13e6" + name = "gopkg.in/vmihailenco/msgpack.v2" + packages = [ + ".", + "codes", + ] + pruneopts = "UT" + revision = "f4f8982de4ef0de18be76456617cc3f5d8d8141e" + version = "v2.9.1" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + input-imports = ["gopkg.in/vmihailenco/msgpack.v2"] + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 000000000..b169f0a76 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,34 @@ +# Gopkg.toml example +# +# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + + +[[constraint]] + name = "gopkg.in/vmihailenco/msgpack.v2" + version = "2.9.1" + +[prune] + go-tests = true + unused-packages = true diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 000000000..2c57b3041 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,11 @@ +version: "3.3" + +services: + tarantool_queue: + image: tarantool/tarantool:1.10.3 + container_name: tarantool_queue + command: ["tarantool", "/opt/tarantool/entrypoint/config.lua"] + volumes: + - ./queue/config.lua:/opt/tarantool/entrypoint/config.lua + ports: + - "3013:3013" diff --git a/queue/config.lua b/queue/config.lua index 2488ff300..62a538856 100644 --- a/queue/config.lua +++ b/queue/config.lua @@ -1,29 +1,28 @@ queue = require 'queue' -box.cfg{ - listen = 3013, - wal_dir='xlog', - snap_dir='snap', +box.cfg { + listen = '*:3013'; } box.once("init", function() -box.schema.user.create('test', {password = 'test'}) -box.schema.func.create('queue.tube.test_queue:ack') -box.schema.func.create('queue.tube.test_queue:put') -box.schema.func.create('queue.tube.test_queue:drop') -box.schema.func.create('queue.tube.test_queue:peek') -box.schema.func.create('queue.tube.test_queue:kick') -box.schema.func.create('queue.tube.test_queue:take') -box.schema.func.create('queue.tube.test_queue:delete') -box.schema.func.create('queue.tube.test_queue:release') -box.schema.func.create('queue.tube.test_queue:bury') -box.schema.func.create('queue.statistics') -box.schema.user.grant('test', 'execute', 'universe') -box.schema.user.grant('test', 'read,write', 'space', '_queue') -box.schema.user.grant('test', 'read,write', 'space', '_schema') -box.schema.user.grant('test', 'read,write', 'space', '_space') -box.schema.user.grant('test', 'read,write', 'space', '_index') -box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') -box.schema.user.grant('test', 'read,write', 'space', '_priv') -box.schema.user.grant('test', 'read,write', 'space', '_queue_taken') + box.schema.user.create('test', { password = 'test' }) + box.schema.func.create('queue.tube.test_queue:ack') + box.schema.func.create('queue.tube.test_queue:put') + box.schema.func.create('queue.tube.test_queue:drop') + box.schema.func.create('queue.tube.test_queue:peek') + box.schema.func.create('queue.tube.test_queue:kick') + box.schema.func.create('queue.tube.test_queue:take') + box.schema.func.create('queue.tube.test_queue:delete') + box.schema.func.create('queue.tube.test_queue:release') + box.schema.func.create('queue.tube.test_queue:bury') + box.schema.func.create('queue.statistics') + box.schema.user.grant('test', 'create,execute', 'universe') + box.schema.user.grant('test', 'create,read,write', 'space', '_queue') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') + box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'create,read,write', 'space', '_space') + box.schema.user.grant('test', 'read,write', 'space', '_index') + box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') + box.schema.user.grant('test', 'read,write', 'space', '_priv') + box.schema.user.grant('test', 'read,write', 'space', '_queue_taken') end) diff --git a/queue/queue.go b/queue/queue.go index e32309952..5b29ac770 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -103,6 +103,7 @@ type Opts struct { Ttl time.Duration // task time to live Ttr time.Duration // task time to execute Delay time.Duration // delayed execution + Utube string } func (opts Opts) toMap() map[string]interface{} { @@ -123,6 +124,10 @@ func (opts Opts) toMap() map[string]interface{} { if opts.Pri != 0 { ret["pri"] = opts.Pri } + + if opts.Utube != "" { + ret["utube"] = opts.Utube + } return ret } @@ -276,8 +281,8 @@ func (q *queue) Kick(count uint64) (uint64, error) { // Delete the task identified by its id. func (q *queue) Delete(taskId uint64) error { - _, err := q._delete(taskId) - return err + _, err := q._delete(taskId) + return err } // Return the number of tasks in a queue broken down by task_state, and the number of requests broken down by the type of request. diff --git a/queue/queue_test.go b/queue/queue_test.go index 535faec5f..afe39c957 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -2,6 +2,7 @@ package queue_test import ( "fmt" + "math" "testing" "time" @@ -11,10 +12,13 @@ import ( ) var server = "127.0.0.1:3013" + var opts = Opts{ - Timeout: 500 * time.Millisecond, - User: "test", - Pass: "test", + Timeout: 5 * time.Second, + User: "test", + Pass: "test", + MaxReconnects: 10, + Reconnect: 500 * time.Millisecond, //Concurrency: 32, //RateLimit: 4*1024, } @@ -747,3 +751,73 @@ func TestTtlQueue_Put(t *testing.T) { } } } + +func TestUtube_Put(t *testing.T) { + conn, err := Connect(server, opts) + if err != nil { + t.Errorf("Failed to connect: %s", err.Error()) + return + } + if conn == nil { + t.Errorf("conn is nil after Connect") + return + } + defer conn.Close() + + name := "test_utube" + cfg := queue.Cfg{ + Temporary: true, + Kind: queue.UTUBE, + IfNotExists: true, + } + q := queue.New(conn, name) + if err = q.Create(cfg); err != nil { + t.Errorf("Failed to create queue: %s", err.Error()) + return + } + defer func() { + //Drop + err := q.Drop() + if err != nil { + t.Errorf("Failed drop queue: %s", err.Error()) + } + }() + + data1 := &customData{"test-data-0"} + _, err = q.PutWithOpts(data1, queue.Opts{Utube: "test-utube-consumer-key"}) + if err != nil { + t.Fatalf("Failed put task to queue: %s", err.Error()) + } + data2 := &customData{"test-data-1"} + _, err = q.PutWithOpts(data2, queue.Opts{Utube: "test-utube-consumer-key"}) + if err != nil { + t.Fatalf("Failed put task to queue: %s", err.Error()) + } + + go func() { + t1, err := q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err.Error()) + } + + time.Sleep(2 * time.Second) + if err := t1.Ack(); err != nil { + t.Fatalf("Failed to ack task: %s", err.Error()) + } + }() + + time.Sleep(100 * time.Millisecond) + // the queue should be blocked for ~2 seconds + start := time.Now() + t2, err := q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err.Error()) + } + if err := t2.Ack(); err != nil { + t.Fatalf("Failed to ack task: %s", err.Error()) + } + end := time.Now() + if math.Abs(float64(end.Sub(start)-2*time.Second)) > float64(200*time.Millisecond) { + t.Fatalf("Blocking time is less than expected: actual = %.2fs, expected = 1s", end.Sub(start).Seconds()) + } +} From bb3bd4109c7d34abfe1908933cea9ade8a085e7f Mon Sep 17 00:00:00 2001 From: "egor.iskandarov" Date: Wed, 13 Nov 2019 17:15:49 +0300 Subject: [PATCH 2/2] undo changes from previous commit that are unrelated to the utube handling PR --- .gitignore | 1 - Gopkg.lock | 54 --------------------------------------------- Gopkg.toml | 34 ---------------------------- docker-compose.yaml | 11 --------- queue/config.lua | 45 +++++++++++++++++++------------------ queue/queue.go | 2 +- queue/queue_test.go | 9 +++----- 7 files changed, 27 insertions(+), 129 deletions(-) delete mode 100644 Gopkg.lock delete mode 100644 Gopkg.toml delete mode 100644 docker-compose.yaml diff --git a/.gitignore b/.gitignore index 1ceea7691..8fcb790f1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,3 @@ .idea/ snap xlog -vendor/ diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index 89ec8d790..000000000 --- a/Gopkg.lock +++ /dev/null @@ -1,54 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:318f1c959a8a740366fce4b1e1eb2fd914036b4af58fbd0a003349b305f118ad" - name = "github.com/golang/protobuf" - packages = ["proto"] - pruneopts = "UT" - revision = "b5d812f8a3706043e23a9cd5babf2e5423744d30" - version = "v1.3.1" - -[[projects]] - branch = "master" - digest = "1:76ee51c3f468493aff39dbacc401e8831fbb765104cbf613b89bef01cf4bad70" - name = "golang.org/x/net" - packages = ["context"] - pruneopts = "UT" - revision = "f4e77d36d62c17c2336347bb2670ddbd02d092b7" - -[[projects]] - digest = "1:f5b35141aaa32649e375fa72052f81c99808e2a58c77602948d82b06570a4f41" - name = "google.golang.org/appengine" - packages = [ - ".", - "datastore", - "internal", - "internal/app_identity", - "internal/base", - "internal/datastore", - "internal/log", - "internal/modules", - "internal/remote_api", - ] - pruneopts = "UT" - revision = "54a98f90d1c46b7731eb8fb305d2a321c30ef610" - version = "v1.5.0" - -[[projects]] - digest = "1:a7438a3f7f92a25e8fb1d67de9ff4fc3355b86249999bf131092b76823db13e6" - name = "gopkg.in/vmihailenco/msgpack.v2" - packages = [ - ".", - "codes", - ] - pruneopts = "UT" - revision = "f4f8982de4ef0de18be76456617cc3f5d8d8141e" - version = "v2.9.1" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = ["gopkg.in/vmihailenco/msgpack.v2"] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index b169f0a76..000000000 --- a/Gopkg.toml +++ /dev/null @@ -1,34 +0,0 @@ -# Gopkg.toml example -# -# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" -# -# [prune] -# non-go = false -# go-tests = true -# unused-packages = true - - -[[constraint]] - name = "gopkg.in/vmihailenco/msgpack.v2" - version = "2.9.1" - -[prune] - go-tests = true - unused-packages = true diff --git a/docker-compose.yaml b/docker-compose.yaml deleted file mode 100644 index 2c57b3041..000000000 --- a/docker-compose.yaml +++ /dev/null @@ -1,11 +0,0 @@ -version: "3.3" - -services: - tarantool_queue: - image: tarantool/tarantool:1.10.3 - container_name: tarantool_queue - command: ["tarantool", "/opt/tarantool/entrypoint/config.lua"] - volumes: - - ./queue/config.lua:/opt/tarantool/entrypoint/config.lua - ports: - - "3013:3013" diff --git a/queue/config.lua b/queue/config.lua index 62a538856..2488ff300 100644 --- a/queue/config.lua +++ b/queue/config.lua @@ -1,28 +1,29 @@ queue = require 'queue' -box.cfg { - listen = '*:3013'; +box.cfg{ + listen = 3013, + wal_dir='xlog', + snap_dir='snap', } box.once("init", function() - box.schema.user.create('test', { password = 'test' }) - box.schema.func.create('queue.tube.test_queue:ack') - box.schema.func.create('queue.tube.test_queue:put') - box.schema.func.create('queue.tube.test_queue:drop') - box.schema.func.create('queue.tube.test_queue:peek') - box.schema.func.create('queue.tube.test_queue:kick') - box.schema.func.create('queue.tube.test_queue:take') - box.schema.func.create('queue.tube.test_queue:delete') - box.schema.func.create('queue.tube.test_queue:release') - box.schema.func.create('queue.tube.test_queue:bury') - box.schema.func.create('queue.statistics') - box.schema.user.grant('test', 'create,execute', 'universe') - box.schema.user.grant('test', 'create,read,write', 'space', '_queue') - box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') - box.schema.user.grant('test', 'read,write', 'space', '_schema') - box.schema.user.grant('test', 'create,read,write', 'space', '_space') - box.schema.user.grant('test', 'read,write', 'space', '_index') - box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') - box.schema.user.grant('test', 'read,write', 'space', '_priv') - box.schema.user.grant('test', 'read,write', 'space', '_queue_taken') +box.schema.user.create('test', {password = 'test'}) +box.schema.func.create('queue.tube.test_queue:ack') +box.schema.func.create('queue.tube.test_queue:put') +box.schema.func.create('queue.tube.test_queue:drop') +box.schema.func.create('queue.tube.test_queue:peek') +box.schema.func.create('queue.tube.test_queue:kick') +box.schema.func.create('queue.tube.test_queue:take') +box.schema.func.create('queue.tube.test_queue:delete') +box.schema.func.create('queue.tube.test_queue:release') +box.schema.func.create('queue.tube.test_queue:bury') +box.schema.func.create('queue.statistics') +box.schema.user.grant('test', 'execute', 'universe') +box.schema.user.grant('test', 'read,write', 'space', '_queue') +box.schema.user.grant('test', 'read,write', 'space', '_schema') +box.schema.user.grant('test', 'read,write', 'space', '_space') +box.schema.user.grant('test', 'read,write', 'space', '_index') +box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') +box.schema.user.grant('test', 'read,write', 'space', '_priv') +box.schema.user.grant('test', 'read,write', 'space', '_queue_taken') end) diff --git a/queue/queue.go b/queue/queue.go index 5b29ac770..34992f8df 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -124,7 +124,7 @@ func (opts Opts) toMap() map[string]interface{} { if opts.Pri != 0 { ret["pri"] = opts.Pri } - + if opts.Utube != "" { ret["utube"] = opts.Utube } diff --git a/queue/queue_test.go b/queue/queue_test.go index afe39c957..d1a909120 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -12,13 +12,10 @@ import ( ) var server = "127.0.0.1:3013" - var opts = Opts{ - Timeout: 5 * time.Second, - User: "test", - Pass: "test", - MaxReconnects: 10, - Reconnect: 500 * time.Millisecond, + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", //Concurrency: 32, //RateLimit: 4*1024, }