diff --git a/.gitignore b/.gitignore index 8fcb790f1..1ceea7691 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .idea/ snap xlog +vendor/ diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 000000000..89ec8d790 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,54 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + digest = "1:318f1c959a8a740366fce4b1e1eb2fd914036b4af58fbd0a003349b305f118ad" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "UT" + revision = "b5d812f8a3706043e23a9cd5babf2e5423744d30" + version = "v1.3.1" + +[[projects]] + branch = "master" + digest = "1:76ee51c3f468493aff39dbacc401e8831fbb765104cbf613b89bef01cf4bad70" + name = "golang.org/x/net" + packages = ["context"] + pruneopts = "UT" + revision = "f4e77d36d62c17c2336347bb2670ddbd02d092b7" + +[[projects]] + digest = "1:f5b35141aaa32649e375fa72052f81c99808e2a58c77602948d82b06570a4f41" + name = "google.golang.org/appengine" + packages = [ + ".", + "datastore", + "internal", + "internal/app_identity", + "internal/base", + "internal/datastore", + "internal/log", + "internal/modules", + "internal/remote_api", + ] + pruneopts = "UT" + revision = "54a98f90d1c46b7731eb8fb305d2a321c30ef610" + version = "v1.5.0" + +[[projects]] + digest = "1:a7438a3f7f92a25e8fb1d67de9ff4fc3355b86249999bf131092b76823db13e6" + name = "gopkg.in/vmihailenco/msgpack.v2" + packages = [ + ".", + "codes", + ] + pruneopts = "UT" + revision = "f4f8982de4ef0de18be76456617cc3f5d8d8141e" + version = "v2.9.1" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + input-imports = ["gopkg.in/vmihailenco/msgpack.v2"] + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 000000000..b169f0a76 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,34 @@ +# Gopkg.toml example +# +# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + + +[[constraint]] + name = "gopkg.in/vmihailenco/msgpack.v2" + version = "2.9.1" + +[prune] + go-tests = true + unused-packages = true diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 000000000..2c57b3041 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,11 @@ +version: "3.3" + +services: + tarantool_queue: + image: tarantool/tarantool:1.10.3 + container_name: tarantool_queue + command: ["tarantool", "/opt/tarantool/entrypoint/config.lua"] + volumes: + - ./queue/config.lua:/opt/tarantool/entrypoint/config.lua + ports: + - "3013:3013" diff --git a/queue/config.lua b/queue/config.lua index 2488ff300..62a538856 100644 --- a/queue/config.lua +++ b/queue/config.lua @@ -1,29 +1,28 @@ queue = require 'queue' -box.cfg{ - listen = 3013, - wal_dir='xlog', - snap_dir='snap', +box.cfg { + listen = '*:3013'; } box.once("init", function() -box.schema.user.create('test', {password = 'test'}) -box.schema.func.create('queue.tube.test_queue:ack') -box.schema.func.create('queue.tube.test_queue:put') -box.schema.func.create('queue.tube.test_queue:drop') -box.schema.func.create('queue.tube.test_queue:peek') -box.schema.func.create('queue.tube.test_queue:kick') -box.schema.func.create('queue.tube.test_queue:take') -box.schema.func.create('queue.tube.test_queue:delete') -box.schema.func.create('queue.tube.test_queue:release') -box.schema.func.create('queue.tube.test_queue:bury') -box.schema.func.create('queue.statistics') -box.schema.user.grant('test', 'execute', 'universe') -box.schema.user.grant('test', 'read,write', 'space', '_queue') -box.schema.user.grant('test', 'read,write', 'space', '_schema') -box.schema.user.grant('test', 'read,write', 'space', '_space') -box.schema.user.grant('test', 'read,write', 'space', '_index') -box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') -box.schema.user.grant('test', 'read,write', 'space', '_priv') -box.schema.user.grant('test', 'read,write', 'space', '_queue_taken') + box.schema.user.create('test', { password = 'test' }) + box.schema.func.create('queue.tube.test_queue:ack') + box.schema.func.create('queue.tube.test_queue:put') + box.schema.func.create('queue.tube.test_queue:drop') + box.schema.func.create('queue.tube.test_queue:peek') + box.schema.func.create('queue.tube.test_queue:kick') + box.schema.func.create('queue.tube.test_queue:take') + box.schema.func.create('queue.tube.test_queue:delete') + box.schema.func.create('queue.tube.test_queue:release') + box.schema.func.create('queue.tube.test_queue:bury') + box.schema.func.create('queue.statistics') + box.schema.user.grant('test', 'create,execute', 'universe') + box.schema.user.grant('test', 'create,read,write', 'space', '_queue') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') + box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'create,read,write', 'space', '_space') + box.schema.user.grant('test', 'read,write', 'space', '_index') + box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') + box.schema.user.grant('test', 'read,write', 'space', '_priv') + box.schema.user.grant('test', 'read,write', 'space', '_queue_taken') end) diff --git a/queue/queue.go b/queue/queue.go index e32309952..5b29ac770 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -103,6 +103,7 @@ type Opts struct { Ttl time.Duration // task time to live Ttr time.Duration // task time to execute Delay time.Duration // delayed execution + Utube string } func (opts Opts) toMap() map[string]interface{} { @@ -123,6 +124,10 @@ func (opts Opts) toMap() map[string]interface{} { if opts.Pri != 0 { ret["pri"] = opts.Pri } + + if opts.Utube != "" { + ret["utube"] = opts.Utube + } return ret } @@ -276,8 +281,8 @@ func (q *queue) Kick(count uint64) (uint64, error) { // Delete the task identified by its id. func (q *queue) Delete(taskId uint64) error { - _, err := q._delete(taskId) - return err + _, err := q._delete(taskId) + return err } // Return the number of tasks in a queue broken down by task_state, and the number of requests broken down by the type of request. diff --git a/queue/queue_test.go b/queue/queue_test.go index 535faec5f..afe39c957 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -2,6 +2,7 @@ package queue_test import ( "fmt" + "math" "testing" "time" @@ -11,10 +12,13 @@ import ( ) var server = "127.0.0.1:3013" + var opts = Opts{ - Timeout: 500 * time.Millisecond, - User: "test", - Pass: "test", + Timeout: 5 * time.Second, + User: "test", + Pass: "test", + MaxReconnects: 10, + Reconnect: 500 * time.Millisecond, //Concurrency: 32, //RateLimit: 4*1024, } @@ -747,3 +751,73 @@ func TestTtlQueue_Put(t *testing.T) { } } } + +func TestUtube_Put(t *testing.T) { + conn, err := Connect(server, opts) + if err != nil { + t.Errorf("Failed to connect: %s", err.Error()) + return + } + if conn == nil { + t.Errorf("conn is nil after Connect") + return + } + defer conn.Close() + + name := "test_utube" + cfg := queue.Cfg{ + Temporary: true, + Kind: queue.UTUBE, + IfNotExists: true, + } + q := queue.New(conn, name) + if err = q.Create(cfg); err != nil { + t.Errorf("Failed to create queue: %s", err.Error()) + return + } + defer func() { + //Drop + err := q.Drop() + if err != nil { + t.Errorf("Failed drop queue: %s", err.Error()) + } + }() + + data1 := &customData{"test-data-0"} + _, err = q.PutWithOpts(data1, queue.Opts{Utube: "test-utube-consumer-key"}) + if err != nil { + t.Fatalf("Failed put task to queue: %s", err.Error()) + } + data2 := &customData{"test-data-1"} + _, err = q.PutWithOpts(data2, queue.Opts{Utube: "test-utube-consumer-key"}) + if err != nil { + t.Fatalf("Failed put task to queue: %s", err.Error()) + } + + go func() { + t1, err := q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err.Error()) + } + + time.Sleep(2 * time.Second) + if err := t1.Ack(); err != nil { + t.Fatalf("Failed to ack task: %s", err.Error()) + } + }() + + time.Sleep(100 * time.Millisecond) + // the queue should be blocked for ~2 seconds + start := time.Now() + t2, err := q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err.Error()) + } + if err := t2.Ack(); err != nil { + t.Fatalf("Failed to ack task: %s", err.Error()) + } + end := time.Now() + if math.Abs(float64(end.Sub(start)-2*time.Second)) > float64(200*time.Millisecond) { + t.Fatalf("Blocking time is less than expected: actual = %.2fs, expected = 1s", end.Sub(start).Seconds()) + } +}