Skip to content

Fix ordering of publishing #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions coherence/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
pb1 "github.com/oracle/coherence-go-client/v2/proto/v1"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
"math/rand/v2"
"strings"
"sync"
)
Expand Down Expand Up @@ -161,8 +162,6 @@ func GetNamedTopic[V any](ctx context.Context, session *Session, topicName strin
return nil, getExistingError("NamedTopic", topicName)
}

// check any topic options

session.debug("using existing NamedTopic: %v", existing)
return existing, nil
}
Expand Down Expand Up @@ -190,6 +189,7 @@ type topicPublisher[V any] struct {
proxyID int32
publisherID int64
channelCount int32
defaultOrderingSeed int32
options *publisher.Options
valueSerializer Serializer[V]
mutex sync.RWMutex
Expand All @@ -213,7 +213,12 @@ func (tp *topicPublisher[V]) Publish(ctx context.Context, value V) (*publisher.P
return nil, ErrPublisherClosed
}

publishChannel := tp.ensureTopicChannel()
var publishChannel = tp.defaultOrderingSeed // use defaultOrderingSeed as default

if tp.defaultOrderingSeed == -1 {
// use the hash from ordering option
publishChannel = tp.ensureTopicChannel()
}

binValue, err := tp.valueSerializer.Serialize(value)
if err != nil {
Expand Down Expand Up @@ -327,16 +332,26 @@ func (bt *baseTopicsClient[V]) setReleased() {

func newPublisher[V any](session *Session, bt *baseTopicsClient[V], result *publisher.EnsurePublisherResult, topicName string, options *publisher.Options) (Publisher[V], error) {
tp := &topicPublisher[V]{
namedTopic: bt,
publisherID: result.PublisherID,
session: session,
options: options,
valueSerializer: NewSerializer[V](session.sessOpts.Format),
topicName: topicName,
proxyID: result.ProxyID,
channelCount: result.ChannelCount,
isClosed: false,
namedTopic: bt,
publisherID: result.PublisherID,
session: session,
options: options,
valueSerializer: NewSerializer[V](session.sessOpts.Format),
topicName: topicName,
proxyID: result.ProxyID,
defaultOrderingSeed: -1,
channelCount: result.ChannelCount,
isClosed: false,
}

ordering := options.GetOrdering()
if _, ok := ordering.(*publisher.OrderByDefault); ok {
// set the defaultOrderSeed to a non -1 value which means to use this number for the channel
// hash all the time.
// #nosec G404 -- math/rand is fine here for non-security use
tp.defaultOrderingSeed = rand.Int32() % tp.channelCount
}

session.mapMutex.Lock()
defer session.mapMutex.Unlock()
session.publishers[result.PublisherID] = tp
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ toolchain go1.23.7

require (
github.com/google/uuid v1.6.0
golang.org/x/text v0.26.0
golang.org/x/text v0.28.0
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a h1:v2PbRU4K3llS09c7zodFpNePeamkAwG3mPrAery9VeE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,31 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.Cluster;
import com.tangosol.net.Coherence;
import com.tangosol.net.CoherenceConfiguration;
import com.tangosol.net.DefaultCacheServer;
import com.tangosol.net.NamedCache;
import com.tangosol.net.*;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import com.tangosol.net.NamedMap;
import com.tangosol.net.SessionConfiguration;

import com.tangosol.net.management.MBeanServerProxy;

import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.Subscriber;

import static com.tangosol.net.topic.Subscriber.Name.inGroup;
import static com.tangosol.util.Base.log;

/**
* A simple Http server that is deployed into a Coherence cluster
* and can be used to perform various tests.
*
* @author jk 2019.08.09
* @author jk 2019.08.09
* @author tam 2022.02.08
*/
public class RestServer {
Expand Down Expand Up @@ -74,6 +76,7 @@ public static void main(String[] args) {
server.createContext("/isIsReadyPresent", RestServer::isIsReadyPresent);
server.createContext("/populateQueue", RestServer::populateQueue);
server.createContext("/destroyTopic", RestServer::destroyTopic);
server.createContext("/createCustomerTopic", RestServer::createCustomerTopic);

server.setExecutor(null); // creates a default executor
server.start();
Expand Down Expand Up @@ -143,6 +146,45 @@ private static void destroyTopic(HttpExchange t) throws IOException {
send(t, 200, "OK");
}

private static void createCustomerTopic(HttpExchange t) throws IOException {
try {
URI uri = t.getRequestURI();
String path = uri.getPath();
String[] pathComponents = path.split("/");

if (pathComponents.length < 4 || !pathComponents[pathComponents.length - 3].equals("createCustomerTopic")) {
t.sendResponseHeaders(400, -1); // Bad Request
return;
}

String topicName = pathComponents[pathComponents.length - 2];
int count = Integer.parseInt(pathComponents[pathComponents.length - 1]);

Coherence coherence = Coherence.getInstance();
if (coherence == null) {
coherence = Coherence.clusterMember().start().get();
}
Session session = coherence.getSession();

NamedTopic<Customer> topic = session.getTopic(topicName);
Publisher<Customer> publisher = topic.createPublisher();

for (int i = 0; i < count; i++) {
Customer customer = new Customer(i, "name-" + i, getAddress(i), getAddress(i), Customer.GOLD, 1000* i);
publisher.publish(customer).join();
}
publisher.close();
} catch (Exception e) {
e.printStackTrace();
send(t, 404, "Error: " + e.getMessage());
}
send(t, 200, "OK");
}

private static Address getAddress(int id) {
return new Address("address-line-1-" + id, "address-line-2-" + id, "Suburb-" + id, "City-" + id, "State-" + id, id);
}

private static void ready(HttpExchange t) throws IOException {
send(t, 200, "OK");
}
Expand Down
38 changes: 9 additions & 29 deletions test/e2e/standalone/java_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@ import (
"testing"
)

type Customer struct {
Class string `json:"@class"`
ID int `json:"id"`
CustomerName string `json:"customerName"`
HomeAddress CustomerAddress `json:"homeAddress"`
PostalAddress CustomerAddress `json:"postalAddress"`
CustomerType string `json:"customerType"`
OutstandingBalance float32 `json:"outstandingBalance"`
}

type CustomerAddress struct {
Class string `json:"@class"`
AddressLine1 string `json:"addressLine1"`
AddressLine2 string `json:"addressLine2"`
Suburb string `json:"suburb"`
City string `json:"city"`
State string `json:"state"`
PostCode int `json:"postCode"`
}

// TestBasicOperationsAgainstMapAndCache runs all tests against NamedMap and NamedCache
func TestJavaSerializationAgainstMapAndCache(t *testing.T) {
g := gomega.NewWithT(t)
Expand All @@ -42,11 +22,11 @@ func TestJavaSerializationAgainstMapAndCache(t *testing.T) {

testCases := []struct {
testName string
nameMap coherence.NamedMap[int, Customer]
test func(t *testing.T, namedCache coherence.NamedMap[int, Customer])
nameMap coherence.NamedMap[int, utils.Customer]
test func(t *testing.T, namedCache coherence.NamedMap[int, utils.Customer])
}{
{"NamedMapSerializationTest", GetNamedMap[int, Customer](g, session, "customer-map"), RunSerializationTest},
{"NamedCacheSerializationTest", GetNamedCache[int, Customer](g, session, "customer-cache"), RunSerializationTest},
{"NamedMapSerializationTest", GetNamedMap[int, utils.Customer](g, session, "customer-map"), RunSerializationTest},
{"NamedCacheSerializationTest", GetNamedCache[int, utils.Customer](g, session, "customer-cache"), RunSerializationTest},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
Expand All @@ -55,10 +35,10 @@ func TestJavaSerializationAgainstMapAndCache(t *testing.T) {
}
}

func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Customer]) {
func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, utils.Customer]) {
var (
g = gomega.NewWithT(t)
result *Customer
result *utils.Customer
err error
)

Expand All @@ -71,7 +51,7 @@ func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Custome
err = namedMap.Clear(ctx)
g.Expect(err).ShouldNot(gomega.HaveOccurred())

homeAddress := CustomerAddress{
homeAddress := utils.CustomerAddress{
Class: addressClass,
AddressLine1: "123 James Street",
Suburb: "Balcatta",
Expand All @@ -80,7 +60,7 @@ func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Custome
PostCode: 6000,
}

postalAddress := CustomerAddress{
postalAddress := utils.CustomerAddress{
Class: addressClass,
AddressLine1: "PO Box 1000",
AddressLine2: "Balcatta Post Office",
Expand All @@ -90,7 +70,7 @@ func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Custome
PostCode: 6000,
}

customer := Customer{
customer := utils.Customer{
Class: customerClass,
ID: 1,
CustomerName: "Tim",
Expand Down
Loading
Loading