...

Source file src/cloud.google.com/go/pubsub/example_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2014 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub_test
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"time"
    21  
    22  	"cloud.google.com/go/pubsub"
    23  	"google.golang.org/api/iterator"
    24  )
    25  
    26  func ExampleNewClient() {
    27  	ctx := context.Background()
    28  	_, err := pubsub.NewClient(ctx, "project-id")
    29  	if err != nil {
    30  		// TODO: Handle error.
    31  	}
    32  
    33  	// See the other examples to learn how to use the Client.
    34  }
    35  
    36  func ExampleClient_CreateTopic() {
    37  	ctx := context.Background()
    38  	client, err := pubsub.NewClient(ctx, "project-id")
    39  	if err != nil {
    40  		// TODO: Handle error.
    41  	}
    42  
    43  	// Create a new topic with the given name.
    44  	topic, err := client.CreateTopic(ctx, "topicName")
    45  	if err != nil {
    46  		// TODO: Handle error.
    47  	}
    48  
    49  	_ = topic // TODO: use the topic.
    50  }
    51  
    52  func ExampleClient_CreateTopicWithConfig() {
    53  	ctx := context.Background()
    54  	client, err := pubsub.NewClient(ctx, "project-id")
    55  	if err != nil {
    56  		// TODO: Handle error.
    57  	}
    58  
    59  	// Create a new topic with the given name and config.
    60  	topicConfig := &pubsub.TopicConfig{
    61  		KMSKeyName: "projects/project-id/locations/global/keyRings/my-key-ring/cryptoKeys/my-key",
    62  		MessageStoragePolicy: pubsub.MessageStoragePolicy{
    63  			AllowedPersistenceRegions: []string{"us-east1"},
    64  		},
    65  	}
    66  	topic, err := client.CreateTopicWithConfig(ctx, "topicName", topicConfig)
    67  	if err != nil {
    68  		// TODO: Handle error.
    69  	}
    70  	_ = topic // TODO: use the topic.
    71  }
    72  
    73  // Use TopicInProject to refer to a topic that is not in the client's project, such
    74  // as a public topic.
    75  func ExampleClient_TopicInProject() {
    76  	ctx := context.Background()
    77  	client, err := pubsub.NewClient(ctx, "project-id")
    78  	if err != nil {
    79  		// TODO: Handle error.
    80  	}
    81  	topic := client.TopicInProject("topicName", "another-project-id")
    82  	_ = topic // TODO: use the topic.
    83  }
    84  
    85  func ExampleClient_CreateSubscription() {
    86  	ctx := context.Background()
    87  	client, err := pubsub.NewClient(ctx, "project-id")
    88  	if err != nil {
    89  		// TODO: Handle error.
    90  	}
    91  
    92  	// Create a new topic with the given name.
    93  	topic, err := client.CreateTopic(ctx, "topicName")
    94  	if err != nil {
    95  		// TODO: Handle error.
    96  	}
    97  
    98  	// Create a new subscription to the previously created topic
    99  	// with the given name.
   100  	sub, err := client.CreateSubscription(ctx, "subName", pubsub.SubscriptionConfig{
   101  		Topic:            topic,
   102  		AckDeadline:      10 * time.Second,
   103  		ExpirationPolicy: 25 * time.Hour,
   104  	})
   105  	if err != nil {
   106  		// TODO: Handle error.
   107  	}
   108  
   109  	_ = sub // TODO: use the subscription.
   110  }
   111  
   112  func ExampleClient_CreateSubscription_neverExpire() {
   113  	ctx := context.Background()
   114  	client, err := pubsub.NewClient(ctx, "project-id")
   115  	if err != nil {
   116  		// TODO: Handle error.
   117  	}
   118  
   119  	// Create a new topic with the given name.
   120  	topic, err := client.CreateTopic(ctx, "topicName")
   121  	if err != nil {
   122  		// TODO: Handle error.
   123  	}
   124  
   125  	// Create a new subscription to the previously
   126  	// created topic and ensure it never expires.
   127  	sub, err := client.CreateSubscription(ctx, "subName", pubsub.SubscriptionConfig{
   128  		Topic:            topic,
   129  		AckDeadline:      10 * time.Second,
   130  		ExpirationPolicy: time.Duration(0),
   131  	})
   132  	if err != nil {
   133  		// TODO: Handle error.
   134  	}
   135  	_ = sub // TODO: Use the subscription
   136  }
   137  
   138  func ExampleTopic_Delete() {
   139  	ctx := context.Background()
   140  	client, err := pubsub.NewClient(ctx, "project-id")
   141  	if err != nil {
   142  		// TODO: Handle error.
   143  	}
   144  
   145  	topic := client.Topic("topicName")
   146  	if err := topic.Delete(ctx); err != nil {
   147  		// TODO: Handle error.
   148  	}
   149  }
   150  
   151  func ExampleTopic_Exists() {
   152  	ctx := context.Background()
   153  	client, err := pubsub.NewClient(ctx, "project-id")
   154  	if err != nil {
   155  		// TODO: Handle error.
   156  	}
   157  
   158  	topic := client.Topic("topicName")
   159  	ok, err := topic.Exists(ctx)
   160  	if err != nil {
   161  		// TODO: Handle error.
   162  	}
   163  	if !ok {
   164  		// Topic doesn't exist.
   165  	}
   166  }
   167  
   168  func ExampleTopic_Publish() {
   169  	ctx := context.Background()
   170  	client, err := pubsub.NewClient(ctx, "project-id")
   171  	if err != nil {
   172  		// TODO: Handle error.
   173  	}
   174  
   175  	topic := client.Topic("topicName")
   176  	defer topic.Stop()
   177  	var results []*pubsub.PublishResult
   178  	r := topic.Publish(ctx, &pubsub.Message{
   179  		Data: []byte("hello world"),
   180  	})
   181  	results = append(results, r)
   182  	// Do other work ...
   183  	for _, r := range results {
   184  		id, err := r.Get(ctx)
   185  		if err != nil {
   186  			// TODO: Handle error.
   187  		}
   188  		fmt.Printf("Published a message with a message ID: %s\n", id)
   189  	}
   190  }
   191  
   192  func ExampleTopic_Subscriptions() {
   193  	ctx := context.Background()
   194  	client, err := pubsub.NewClient(ctx, "project-id")
   195  	if err != nil {
   196  		// TODO: Handle error.
   197  	}
   198  	topic := client.Topic("topic-name")
   199  	// List all subscriptions of the topic (maybe of multiple projects).
   200  	for subs := topic.Subscriptions(ctx); ; {
   201  		sub, err := subs.Next()
   202  		if err == iterator.Done {
   203  			break
   204  		}
   205  		if err != nil {
   206  			// TODO: Handle error.
   207  		}
   208  		_ = sub // TODO: use the subscription.
   209  	}
   210  }
   211  
   212  func ExampleTopic_Update() {
   213  	ctx := context.Background()
   214  	client, err := pubsub.NewClient(ctx, "project-id")
   215  	if err != nil {
   216  		// TODO: Handle error.V
   217  	}
   218  	topic := client.Topic("topic-name")
   219  	topicConfig, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
   220  		MessageStoragePolicy: &pubsub.MessageStoragePolicy{
   221  			AllowedPersistenceRegions: []string{
   222  				"asia-east1", "asia-northeast1", "asia-southeast1", "australia-southeast1",
   223  				"europe-north1", "europe-west1", "europe-west2", "europe-west3", "europe-west4",
   224  				"us-central1", "us-central2", "us-east1", "us-east4", "us-west1", "us-west2"},
   225  		},
   226  	})
   227  	if err != nil {
   228  		// TODO: Handle error.
   229  	}
   230  	_ = topicConfig // TODO: Use TopicConfig
   231  }
   232  
   233  func ExampleTopic_Update_resetMessageStoragePolicy() {
   234  	ctx := context.Background()
   235  	client, err := pubsub.NewClient(ctx, "project-id")
   236  	if err != nil {
   237  		// TODO: Handle error.V
   238  	}
   239  	topic := client.Topic("topic-name")
   240  	topicConfig, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
   241  		// Just use a non-nil MessageStoragePolicy without any fields.
   242  		MessageStoragePolicy: &pubsub.MessageStoragePolicy{},
   243  	})
   244  	if err != nil {
   245  		// TODO: Handle error.
   246  	}
   247  	_ = topicConfig // TODO: Use TopicConfig
   248  }
   249  
   250  func ExampleSubscription_Delete() {
   251  	ctx := context.Background()
   252  	client, err := pubsub.NewClient(ctx, "project-id")
   253  	if err != nil {
   254  		// TODO: Handle error.
   255  	}
   256  
   257  	sub := client.Subscription("subName")
   258  	if err := sub.Delete(ctx); err != nil {
   259  		// TODO: Handle error.
   260  	}
   261  }
   262  
   263  func ExampleSubscription_Exists() {
   264  	ctx := context.Background()
   265  	client, err := pubsub.NewClient(ctx, "project-id")
   266  	if err != nil {
   267  		// TODO: Handle error.
   268  	}
   269  
   270  	sub := client.Subscription("subName")
   271  	ok, err := sub.Exists(ctx)
   272  	if err != nil {
   273  		// TODO: Handle error.
   274  	}
   275  	if !ok {
   276  		// Subscription doesn't exist.
   277  	}
   278  }
   279  
   280  func ExampleSubscription_Config() {
   281  	ctx := context.Background()
   282  	client, err := pubsub.NewClient(ctx, "project-id")
   283  	if err != nil {
   284  		// TODO: Handle error.
   285  	}
   286  	sub := client.Subscription("subName")
   287  	config, err := sub.Config(ctx)
   288  	if err != nil {
   289  		// TODO: Handle error.
   290  	}
   291  	fmt.Println(config)
   292  }
   293  
   294  func ExampleSubscription_Receive() {
   295  	ctx := context.Background()
   296  	client, err := pubsub.NewClient(ctx, "project-id")
   297  	if err != nil {
   298  		// TODO: Handle error.
   299  	}
   300  	sub := client.Subscription("subName")
   301  	err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
   302  		// TODO: Handle message.
   303  		// NOTE: May be called concurrently; synchronize access to shared memory.
   304  		m.Ack()
   305  	})
   306  	if err != nil && err != context.Canceled {
   307  		// TODO: Handle error.
   308  	}
   309  }
   310  
   311  // This example shows how to configure keepalive so that unacknoweldged messages
   312  // expire quickly, allowing other subscribers to take them.
   313  func ExampleSubscription_Receive_maxExtension() {
   314  	ctx := context.Background()
   315  	client, err := pubsub.NewClient(ctx, "project-id")
   316  	if err != nil {
   317  		// TODO: Handle error.
   318  	}
   319  	sub := client.Subscription("subName")
   320  	// This program is expected to process and acknowledge messages in 30 seconds. If
   321  	// not, the Pub/Sub API will assume the message is not acknowledged.
   322  	sub.ReceiveSettings.MaxExtension = 30 * time.Second
   323  	err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
   324  		// TODO: Handle message.
   325  		m.Ack()
   326  	})
   327  	if err != nil && err != context.Canceled {
   328  		// TODO: Handle error.
   329  	}
   330  }
   331  
   332  // This example shows how to throttle Subscription.Receive, which aims for high
   333  // throughput by default. By limiting the number of messages and/or bytes being
   334  // processed at once, you can bound your program's resource consumption.
   335  func ExampleSubscription_Receive_maxOutstanding() {
   336  	ctx := context.Background()
   337  	client, err := pubsub.NewClient(ctx, "project-id")
   338  	if err != nil {
   339  		// TODO: Handle error.
   340  	}
   341  	sub := client.Subscription("subName")
   342  	sub.ReceiveSettings.MaxOutstandingMessages = 5
   343  	sub.ReceiveSettings.MaxOutstandingBytes = 10e6
   344  	err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
   345  		// TODO: Handle message.
   346  		m.Ack()
   347  	})
   348  	if err != nil && err != context.Canceled {
   349  		// TODO: Handle error.
   350  	}
   351  }
   352  
   353  func ExampleSubscription_Update() {
   354  	ctx := context.Background()
   355  	client, err := pubsub.NewClient(ctx, "project-id")
   356  	if err != nil {
   357  		// TODO: Handle error.
   358  	}
   359  	sub := client.Subscription("subName")
   360  	subConfig, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
   361  		PushConfig: &pubsub.PushConfig{Endpoint: "https://example.com/push"},
   362  		// Make the subscription never expire.
   363  		ExpirationPolicy: time.Duration(0),
   364  	})
   365  	if err != nil {
   366  		// TODO: Handle error.
   367  	}
   368  	_ = subConfig // TODO: Use SubscriptionConfig.
   369  }
   370  
   371  func ExampleSubscription_Update_pushConfigAuthenticationMethod() {
   372  	ctx := context.Background()
   373  	client, err := pubsub.NewClient(ctx, "project-id")
   374  	if err != nil {
   375  		// TODO: Handle error.
   376  	}
   377  	sub := client.Subscription("subName")
   378  	subConfig, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
   379  		PushConfig: &pubsub.PushConfig{
   380  			Endpoint: "https://example.com/push",
   381  			AuthenticationMethod: &pubsub.OIDCToken{
   382  				ServiceAccountEmail: "service-account-email",
   383  				Audience:            "client-12345",
   384  			},
   385  		},
   386  	})
   387  	if err != nil {
   388  		// TODO: Handle error.
   389  	}
   390  	_ = subConfig // TODO: Use SubscriptionConfig.
   391  }
   392  
   393  func ExampleSubscription_CreateSnapshot() {
   394  	ctx := context.Background()
   395  	client, err := pubsub.NewClient(ctx, "project-id")
   396  	if err != nil {
   397  		// TODO: Handle error.
   398  	}
   399  	sub := client.Subscription("subName")
   400  	snapConfig, err := sub.CreateSnapshot(ctx, "snapshotName")
   401  	if err != nil {
   402  		// TODO: Handle error.
   403  	}
   404  	_ = snapConfig // TODO: Use SnapshotConfig.
   405  }
   406  
   407  func ExampleSubscription_SeekToSnapshot() {
   408  	ctx := context.Background()
   409  	client, err := pubsub.NewClient(ctx, "project-id")
   410  	if err != nil {
   411  		// TODO: Handle error.
   412  	}
   413  	sub := client.Subscription("subName")
   414  	snap := client.Snapshot("snapshotName")
   415  	if err := sub.SeekToSnapshot(ctx, snap); err != nil {
   416  		// TODO: Handle error.
   417  	}
   418  }
   419  
   420  func ExampleSubscription_SeekToTime() {
   421  	ctx := context.Background()
   422  	client, err := pubsub.NewClient(ctx, "project-id")
   423  	if err != nil {
   424  		// TODO: Handle error.
   425  	}
   426  	sub := client.Subscription("subName")
   427  	if err := sub.SeekToTime(ctx, time.Now().Add(-time.Hour)); err != nil {
   428  		// TODO: Handle error.
   429  	}
   430  }
   431  
   432  func ExampleSnapshot_Delete() {
   433  	ctx := context.Background()
   434  	client, err := pubsub.NewClient(ctx, "project-id")
   435  	if err != nil {
   436  		// TODO: Handle error.
   437  	}
   438  
   439  	snap := client.Snapshot("snapshotName")
   440  	if err := snap.Delete(ctx); err != nil {
   441  		// TODO: Handle error.
   442  	}
   443  }
   444  
   445  func ExampleClient_Snapshots() {
   446  	ctx := context.Background()
   447  	client, err := pubsub.NewClient(ctx, "project-id")
   448  	if err != nil {
   449  		// TODO: Handle error.
   450  	}
   451  	// List all snapshots for the project.
   452  	iter := client.Snapshots(ctx)
   453  	_ = iter // TODO: iterate using Next.
   454  }
   455  
   456  func ExampleSnapshotConfigIterator_Next() {
   457  	ctx := context.Background()
   458  	client, err := pubsub.NewClient(ctx, "project-id")
   459  	if err != nil {
   460  		// TODO: Handle error.
   461  	}
   462  	// List all snapshots for the project.
   463  	iter := client.Snapshots(ctx)
   464  	for {
   465  		snapConfig, err := iter.Next()
   466  		if err == iterator.Done {
   467  			break
   468  		}
   469  		if err != nil {
   470  			// TODO: Handle error.
   471  		}
   472  		_ = snapConfig // TODO: use the SnapshotConfig.
   473  	}
   474  }
   475  
   476  // TODO(jba): write an example for PublishResult.Ready
   477  // TODO(jba): write an example for Subscription.IAM
   478  // TODO(jba): write an example for Topic.IAM
   479  // TODO(jba): write an example for Topic.Stop
   480  

View as plain text